Initial commit

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
dinlo
2026-05-31 18:46:09 +08:00
commit b88ccf3b4b
24 changed files with 3934 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
# Telegram Bot для генерации изображений через Stable Diffusion
+513
View File
@@ -0,0 +1,513 @@
"""Обработчики админ-панели для управления пользователями."""
import logging
import math
from datetime import datetime, timedelta
from aiogram import Router, F
from aiogram.types import CallbackQuery, Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.filters import Command
from config import settings
from database.database import db
router = Router()
logger = logging.getLogger(__name__)
# Константы для пагинации
USERS_PER_PAGE = 10
class AddUserState(StatesGroup):
"""Состояния для процесса добавления пользователя."""
waiting_for_user_id = State()
waiting_for_access_type = State() # unlimited, time, generations
waiting_for_time_days = State()
waiting_for_generations_limit = State()
def is_admin(user_id: int) -> bool:
"""Проверить, является ли пользователь админом."""
return user_id == settings.ADMIN_ID
def check_admin(handler):
"""Декоратор для проверки прав админа."""
async def wrapper(callback_or_message, *args, **kwargs):
# Фильтруем лишние kwargs, которые не ожидаются хендлером
# Оставляем только state и другие FSM-аргументы
allowed_kwargs = {'state', 'session', 'middleware_data'}
filtered_kwargs = {k: v for k, v in kwargs.items() if k in allowed_kwargs}
if isinstance(callback_or_message, CallbackQuery):
user_id = callback_or_message.from_user.id
elif isinstance(callback_or_message, Message):
user_id = callback_or_message.from_user.id
else:
return await callback_or_message.answer("⛔ Ошибка определения пользователя.")
if not is_admin(user_id):
if isinstance(callback_or_message, CallbackQuery):
await callback_or_message.answer("⛔ Недостаточно прав.", show_alert=True)
else:
await callback_or_message.answer("⛔ Недостаточно прав.")
return None
return await handler(callback_or_message, *args, **filtered_kwargs)
return wrapper
@router.message(Command("admin"))
@check_admin
async def cmd_admin(message: Message, state: FSMContext):
"""Админ-панель."""
await state.clear()
await show_admin_panel(message)
@router.callback_query(F.data == "admin_panel")
@check_admin
async def admin_panel(callback: CallbackQuery, state: FSMContext):
"""Показать админ-панель."""
await state.clear()
await show_admin_panel(callback.message)
await callback.answer()
async def show_admin_panel(target):
"""Отобразить админ-панель."""
all_users = await db.get_all_users()
active_users = [u for u in all_users if u["is_active"]]
total_gens = sum(u.get("used_generations", 0) for u in all_users)
text = (
"🛡️ <b>Админ-панель</b>\n\n"
f"👥 Всего пользователей: <b>{len(all_users)}</b>\n"
f"✅ Активных: <b>{len(active_users)}</b>\n"
f"🎨 Всего генераций: <b>{total_gens}</b>\n\n"
"<b>Управление:</b>"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="➕ Добавить пользователя", callback_data="admin_add_user")],
[InlineKeyboardButton(text="📋 Список пользователей", callback_data="admin_users_list")],
[InlineKeyboardButton(text="🧹 Очистить истёкшие доступы", callback_data="admin_cleanup_expired")],
[InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu")],
])
if hasattr(target, 'answer'):
try:
await target.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
except Exception:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
else:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
# --- Добавление пользователя ---
@router.callback_query(F.data == "admin_add_user")
@check_admin
async def admin_add_user(callback: CallbackQuery, state: FSMContext):
"""Начать процесс добавления пользователя."""
await state.set_state(AddUserState.waiting_for_user_id)
await callback.message.edit_text(
"➕ <b>Добавление нового пользователя</b>\n\n"
"Введите <b>Telegram User ID</b> пользователя.\n\n"
"💡 <i>Чтобы узнать ID, пользователь может отправить "
"команду /start боту @userinfobot</i>\n\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(AddUserState.waiting_for_user_id)
@check_admin
async def process_user_id(message: Message, state: FSMContext):
"""Обработка введённого User ID."""
text = message.text.strip()
if not text.isdigit():
await message.answer("❌ Введите корректный числовой ID.")
return
user_id = int(text)
# Проверяем, не админ ли это
if user_id == settings.ADMIN_ID:
await message.answer("❌ Нельзя добавить самого себя как обычного пользователя.")
return
# Проверяем, существует ли уже пользователь
existing = await db.get_user_by_id(user_id)
if existing:
status_text = "активен" if existing["is_active"] else "неактивен"
await message.answer(
f"⚠️ Пользователь <code>{user_id}</code> уже существует в базе.\n"
f"Статус: {status_text}\n"
f"Его параметры будут обновлены.\n\n"
"Продолжить? (да/нет)"
)
# Сохраняем и переходим к выбору типа доступа
await state.update_data(user_id=user_id, existing_user=True)
else:
await state.update_data(user_id=user_id, existing_user=False)
await state.set_state(AddUserState.waiting_for_access_type)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="♾️ Без ограничений", callback_data="access_unlimited")],
[InlineKeyboardButton(text="⏰ По времени (дни)", callback_data="access_time")],
[InlineKeyboardButton(text="🎨 По количеству генераций", callback_data="access_gens")],
[InlineKeyboardButton(text="❌ Отмена", callback_data="admin_panel")],
])
await message.answer(
"📋 <b>Выберите тип доступа:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("access_"), AddUserState.waiting_for_access_type)
@check_admin
async def process_access_type(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора типа доступа."""
access_type = callback.data.replace("access_", "")
if access_type == "unlimited":
await state.update_data(access_type="unlimited", access_expires_at=None, max_generations=None)
await _confirm_and_add_user(callback, state)
elif access_type == "time":
await state.set_state(AddUserState.waiting_for_time_days)
await state.update_data(access_type="time")
await callback.message.edit_text(
"⏰ <b>Введите количество дней</b> доступа:\n\n"
"Например: 7, 30, 90, 365\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
elif access_type == "gens":
await state.set_state(AddUserState.waiting_for_generations_limit)
await state.update_data(access_type="gens")
await callback.message.edit_text(
"🎨 <b>Введите максимальное количество генераций:</b>\n\n"
"Например: 10, 50, 100\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(AddUserState.waiting_for_time_days)
@check_admin
async def process_time_days(message: Message, state: FSMContext):
"""Обработка введённого количества дней."""
text = message.text.strip()
if not text.isdigit():
await message.answer("❌ Введите корректное число дней.")
return
days = int(text)
if days < 1 or days > 3650:
await message.answer("❌ Количество дней должно быть от 1 до 3650.")
return
expires_at = datetime.now() + timedelta(days=days)
await state.update_data(access_expires_at=expires_at, max_generations=None)
await _confirm_and_add_user(message, state)
@router.message(AddUserState.waiting_for_generations_limit)
@check_admin
async def process_gens_limit(message: Message, state: FSMContext):
"""Обработка введённого лимита генераций."""
text = message.text.strip()
if not text.isdigit():
await message.answer("❌ Введите корректное число.")
return
max_gens = int(text)
if max_gens < 1 or max_gens > 100000:
await message.answer("❌ Лимит должен быть от 1 до 100000.")
return
await state.update_data(max_generations=max_gens, access_expires_at=None)
await _confirm_and_add_user(message, state)
async def _confirm_and_add_user(target, state: FSMContext):
"""Подтвердить и добавить пользователя."""
data = await state.get_data()
user_id = data.get("user_id")
if user_id is None:
await target.answer("❌ Ошибка: не указан User ID.")
await state.clear()
return
access_expires_at = data.get("access_expires_at")
max_generations = data.get("max_generations")
# Добавляем/обновляем пользователя
await db.add_user(
user_id=user_id,
added_by=target.from_user.id,
access_expires_at=access_expires_at,
max_generations=max_generations,
)
# Формируем описание доступа
if access_expires_at:
expires_str = access_expires_at.strftime("%d.%m.%Y %H:%M")
access_desc = f"⏰ До: <b>{expires_str}</b>"
elif max_generations:
access_desc = f"🎨 Лимит: <b>{max_generations}</b> генераций"
else:
access_desc = "♾️ Без ограничений"
text = (
f"✅ <b>Пользователь <code>{user_id}</code> добавлен!</b>\n\n"
f"{access_desc}"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=" Добавить ещё", callback_data="admin_add_user")],
[InlineKeyboardButton(text="📋 Список пользователей", callback_data="admin_users_list")],
[InlineKeyboardButton(text="🛡️ Админ-панель", callback_data="admin_panel")],
])
if hasattr(target, 'edit_text'):
try:
await target.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
except Exception:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
else:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
await state.clear()
# --- Список пользователей ---
@router.callback_query(F.data == "admin_users_list")
@check_admin
async def admin_users_list(callback: CallbackQuery, state: FSMContext):
"""Показать список пользователей."""
await state.clear()
await _show_users_list(callback.message, 0)
await callback.answer()
@router.callback_query(F.data.startswith("admin_users_page_"))
@check_admin
async def admin_users_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы пользователей."""
page = int(callback.data.split("_")[-1])
await _show_users_list(callback.message, page)
await callback.answer()
async def _show_users_list(target, page: int):
"""Отобразить список пользователей."""
all_users = await db.get_all_users()
total_pages = math.ceil(len(all_users) / USERS_PER_PAGE) if all_users else 1
if page >= total_pages:
page = total_pages - 1
start = page * USERS_PER_PAGE
end = start + USERS_PER_PAGE
page_users = all_users[start:end]
if not all_users:
text = "📋 <b>Список пользователей пуст</b>"
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="➕ Добавить пользователя", callback_data="admin_add_user")],
[InlineKeyboardButton(text="🛡️ Админ-панель", callback_data="admin_panel")],
])
else:
text = f"👥 <b>Пользователи ({len(all_users)})</b>\n\n"
for i, user in enumerate(page_users, start + 1):
status = "👑" if user["is_admin"] else ("" if user["is_active"] else "")
# Формируем описание доступа
if user["is_admin"]:
access_info = "админ"
elif user.get("access_expires_at"):
try:
exp = datetime.fromisoformat(user["access_expires_at"])
if exp > datetime.now():
days_left = (exp - datetime.now()).days
access_info = f"до {exp.strftime('%d.%m.%Y')} ({days_left} дн.)"
else:
access_info = "истёк"
except Exception:
access_info = ""
elif user.get("max_generations"):
used = user.get("used_generations", 0)
access_info = f"{used}/{user['max_generations']} ген."
else:
access_info = "без ограничений"
name = user.get("display_name") or user.get("username") or f"ID: {user['user_id']}"
text += f"{i}. {status} <code>{user['user_id']}</code> — {name}\n"
text += f" {access_info}\n\n"
# Навигация
nav_row = []
if page > 0:
nav_row.append(InlineKeyboardButton(text="⬅️", callback_data=f"admin_users_page_{page - 1}"))
nav_row.append(InlineKeyboardButton(text=f"{page + 1}/{total_pages}", callback_data="noop"))
if page < total_pages - 1:
nav_row.append(InlineKeyboardButton(text="➡️", callback_data=f"admin_users_page_{page + 1}"))
kb_buttons = []
for user in page_users:
if not user["is_admin"]:
kb_buttons.append([
InlineKeyboardButton(
text=f"{'' if user['is_active'] else ''} {user['user_id']}",
callback_data=f"admin_user_manage_{user['user_id']}"
)
])
kb_buttons.append(nav_row)
kb_buttons.append([InlineKeyboardButton(text=" Добавить", callback_data="admin_add_user")])
kb_buttons.append([InlineKeyboardButton(text="🛡️ Админ-панель", callback_data="admin_panel")])
keyboard = InlineKeyboardMarkup(inline_keyboard=kb_buttons)
if hasattr(target, 'edit_text'):
try:
await target.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
except Exception:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
else:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
# --- Управление конкретным пользователем ---
@router.callback_query(F.data.startswith("admin_user_manage_"))
@check_admin
async def admin_user_manage(callback: CallbackQuery):
"""Управление конкретным пользователем."""
user_id = int(callback.data.split("_")[-1])
user = await db.get_user_by_id(user_id)
if not user:
await callback.answer("Пользователь не найден.", show_alert=True)
return
# Формируем информацию
status = "👑 Админ" if user["is_admin"] else ("✅ Активен" if user["is_active"] else "❌ Неактивен")
if user.get("access_expires_at"):
try:
exp = datetime.fromisoformat(user["access_expires_at"])
expires_str = exp.strftime("%d.%m.%Y %H:%M")
except Exception:
expires_str = ""
else:
expires_str = "не указан"
max_gens = user.get("max_generations") or "без ограничений"
used_gens = user.get("used_generations", 0)
added_by = user.get("added_by", "")
added_at = user.get("added_at", "")
name = user.get("display_name") or user.get("username") or f"ID: {user_id}"
text = (
f"👤 <b>{name}</b>\n\n"
f"ID: <code>{user_id}</code>\n"
f"Статус: {status}\n"
f"Добавлен: {added_at} (админом {added_by})\n"
f"Истекает: {expires_str}\n"
f"Генерации: {used_gens}/{max_gens}"
)
kb_buttons = []
if not user["is_admin"]:
if user["is_active"]:
kb_buttons.append([InlineKeyboardButton(text="🚫 Заблокировать", callback_data=f"admin_user_block_{user_id}")])
else:
kb_buttons.append([InlineKeyboardButton(text="✅ Разблокировать", callback_data=f"admin_user_unblock_{user_id}")])
kb_buttons.append([InlineKeyboardButton(text="🔄 Обновить доступ", callback_data=f"admin_user_update_{user_id}")])
kb_buttons.append([InlineKeyboardButton(text="🗑️ Удалить", callback_data=f"admin_user_delete_{user_id}")])
kb_buttons.append([InlineKeyboardButton(text="⬅️ Назад к списку", callback_data="admin_users_list")])
keyboard = InlineKeyboardMarkup(inline_keyboard=kb_buttons)
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data.startswith("admin_user_block_"))
@check_admin
async def admin_user_block(callback: CallbackQuery):
"""Заблокировать пользователя."""
user_id = int(callback.data.split("_")[-1])
await db.remove_user(user_id)
await callback.answer("🚫 Пользователь заблокирован.")
await admin_user_manage(callback)
@router.callback_query(F.data.startswith("admin_user_unblock_"))
@check_admin
async def admin_user_unblock(callback: CallbackQuery):
"""Разблокировать пользователя."""
user_id = int(callback.data.split("_")[-1])
await db.update_user_access(user_id)
await callback.answer("✅ Пользователь разблокирован.")
await admin_user_manage(callback)
@router.callback_query(F.data.startswith("admin_user_delete_"))
@check_admin
async def admin_user_delete(callback: CallbackQuery):
"""Удалить пользователя (окончательно)."""
user_id = int(callback.data.split("_")[-1])
# В текущей схеме просто деактивируем
await db.remove_user(user_id)
await callback.answer("🗑️ Пользователь удалён.")
await admin_users_list(callback)
@router.callback_query(F.data.startswith("admin_user_update_"))
@check_admin
async def admin_user_update(callback: CallbackQuery, state: FSMContext):
"""Обновить параметры доступа пользователя."""
user_id = int(callback.data.split("_")[-1])
await state.update_data(update_user_id=user_id)
await state.set_state(AddUserState.waiting_for_access_type)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="♾️ Без ограничений", callback_data="access_unlimited")],
[InlineKeyboardButton(text="⏰ По времени (дни)", callback_data="access_time")],
[InlineKeyboardButton(text="🎨 По количеству генераций", callback_data="access_gens")],
[InlineKeyboardButton(text="❌ Отмена", callback_data=f"admin_user_manage_{user_id}")],
])
await callback.message.edit_text(
"🔄 <b>Обновление параметров доступа</b>\n\n"
"Выберите новый тип доступа:",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
# --- Очистка истёкших доступов ---
@router.callback_query(F.data == "admin_cleanup_expired")
@check_admin
async def admin_cleanup(callback: CallbackQuery):
"""Очистка истёкших доступов."""
deactivated = await db.deactivate_expired_users()
await callback.answer(f"🧹 Деактивировано пользователей: {deactivated}", show_alert=True)
await admin_panel(callback)
+241
View File
@@ -0,0 +1,241 @@
"""Обработчики генерации изображений (txt2img)."""
import logging
import os
import uuid
from datetime import datetime
from html import escape
from aiogram import Router, F
from aiogram.types import (
Message,
CallbackQuery,
InlineKeyboardMarkup,
InlineKeyboardButton,
FSInputFile,
)
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from config import settings
from database.database import db
from sd.sd_client import sd_client
from utils.image_manager import get_image_path
router = Router()
logger = logging.getLogger(__name__)
class Txt2ImgState(StatesGroup):
"""Состояния для процесса txt2img."""
waiting_for_prompt = State()
waiting_for_negative_prompt = State()
generating = State()
def get_generation_keyboard() -> InlineKeyboardMarkup:
"""Клавиатура для генерации txt2img."""
return InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="📝 Ввести промпт", callback_data="txt2img_enter_prompt"),
],
[
InlineKeyboardButton(text="📋 Использовать профиль", callback_data="txt2img_use_profile"),
],
[
InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu"),
],
])
@router.callback_query(F.data == "gen_txt2img")
async def start_txt2img(callback: CallbackQuery, state: FSMContext):
"""Начать процесс txt2img."""
await state.clear()
await callback.message.answer(
"🎨 <b>Генерация изображения (txt2img)</b>\n\n"
"Введите описание изображения (промпт).\n"
"Можете использовать <code>/cancel</code> для отмены.",
parse_mode="HTML",
reply_markup=get_generation_keyboard(),
)
await callback.answer()
@router.callback_query(F.data == "txt2img_enter_prompt")
async def enter_prompt_txt(callback: CallbackQuery, state: FSMContext):
"""Ввод промпта для txt2img."""
await state.set_state(Txt2ImgState.waiting_for_prompt)
await callback.message.edit_text(
"📝 <b>Введите промпт:</b>\n\n"
"Опишите изображение, которое хотите сгенерировать.\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.callback_query(F.data == "txt2img_use_profile")
async def use_profile_txt(callback: CallbackQuery, state: FSMContext):
"""Использовать профиль для txt2img."""
profiles = await db.get_user_profiles(callback.from_user.id)
if not profiles:
await callback.answer("У вас пока нет профилей. Создайте профиль в разделе 'Профили'.", show_alert=True)
return
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=p["name"], callback_data=f"txt2img_profile_{p['id']}")]
for p in profiles
])
keyboard.inline_keyboard.append([InlineKeyboardButton(text="⬅️ Назад", callback_data="gen_txt2img")])
await state.set_state(Txt2ImgState.waiting_for_prompt)
await callback.message.edit_text(
"📋 <b>Выберите профиль:</b>\n\n"
"После выбора будет запрошен промпт.",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.callback_query(F.data.startswith("txt2img_profile_"))
async def select_profile_txt(callback: CallbackQuery, state: FSMContext):
"""Выбор профиля для txt2img."""
profile_id = int(callback.data.split("_")[-1])
await state.update_data(profile_id=profile_id)
profile = await db.get_profile(profile_id)
await state.set_state(Txt2ImgState.waiting_for_prompt)
await callback.message.edit_text(
f"✅ Профиль <b>{profile['name']}</b> выбран.\n\n"
f"Параметры: {profile['width']}x{profile['height']}, "
f"шагов: {profile['steps']}, CFG: {profile['cfg_scale']}, "
f"сэмплер: {profile['sampler']}, шедулер: {profile.get('scheduler', 'automatic')}\n\n"
f"📝 <b>Введите промпт:</b>\n"
f"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(Txt2ImgState.waiting_for_prompt, F.text)
async def process_prompt_txt(message: Message, state: FSMContext):
"""Обработка введённого промпта для txt2img."""
prompt = message.text.strip()
if not prompt:
await message.answer("Промпт не может быть пустым. Введите описание изображения.")
return
await state.update_data(prompt=prompt)
# Запрашиваем негативный промпт
await state.set_state(Txt2ImgState.waiting_for_negative_prompt)
await message.answer(
"🚫 <b>Введите негативный промпт</b> (необязательно)\n\n"
"Опишите, что НЕ должно быть на изображении.\n"
"Отправьте <code>-</code> чтобы пропустить.\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
@router.message(Txt2ImgState.waiting_for_negative_prompt)
async def process_negative_prompt_txt(message: Message, state: FSMContext):
"""Обработка негативного промпта для txt2img."""
negative_prompt = message.text.strip()
if negative_prompt == "-":
negative_prompt = ""
await state.update_data(negative_prompt=negative_prompt)
await state.set_state(Txt2ImgState.generating)
# Получаем данные
data = await state.get_data()
prompt = data.get("prompt", "")
profile_id = data.get("profile_id")
# Загружаем настройки пользователя для TTL
user_settings = await db.get_user_settings(message.from_user.id)
ttl_hours = user_settings.get("image_ttl_hours", settings.DEFAULT_IMAGE_TTL_HOURS)
# Если есть профиль, используем его настройки
profile = None
if profile_id:
profile = await db.get_profile(profile_id)
# Отправляем сообщение о начале генерации
status_msg = await message.answer("⏳ <b>Генерация изображения...</b>\n\nЭто может занять некоторое время.", parse_mode="HTML")
# Вызываем API
result = await sd_client.txt2img(
prompt=prompt,
negative_prompt=negative_prompt or (profile["negative_prompt"] if profile else ""),
width=profile["width"] if profile else 512,
height=profile["height"] if profile else 512,
steps=profile["steps"] if profile else 20,
cfg_scale=profile["cfg_scale"] if profile else 7.0,
sampler=profile["sampler"] if profile else "Euler a",
scheduler=profile.get("scheduler", "automatic") if profile else "automatic",
model=profile.get("model") if profile else None,
lora=profile.get("lora") if profile and profile.get("lora") else None,
lora_strength=profile.get("lora_strength", 0.8) if profile else 0.8,
)
if result is None:
await status_msg.edit_text("❌ <b>Ошибка генерации.</b>\n\nПроверьте соединение с SD API и попробуйте снова.")
await state.clear()
return
image_bytes, info = result
# Сохраняем изображение
filename = f"{uuid.uuid4().hex[:12]}.png"
file_path = get_image_path(message.from_user.id, filename)
try:
with open(file_path, "wb") as f:
f.write(image_bytes)
except Exception as e:
logger.error(f"Ошибка сохранения изображения: {e}")
await status_msg.edit_text("❌ <b>Ошибка сохранения изображения.</b>")
await state.clear()
return
# Сохраняем в БД
await db.add_generated_image(
user_id=message.from_user.id,
file_path=file_path,
prompt=prompt,
ttl_hours=ttl_hours,
)
# Увеличиваем счётчик генераций
await db.increment_generation_count(message.from_user.id)
# Отправляем изображение
info_text = (
f"✅ <b>Изображение сгенерировано!</b>\n\n"
f"<b>Промпт:</b> <code>{escape(info['prompt'][:500])}</code>\n"
f"<b>Размер:</b> {info['width']}x{info['height']}\n"
f"<b>Шагов:</b> {info['steps']}\n"
f"<b>CFG Scale:</b> {info['cfg_scale']}\n"
f"<b>Сэмплер:</b> {info['sampler']}\n"
f"<b>Шедулер:</b> {info.get('scheduler', 'automatic')}\n"
f"<b>Seed:</b> <code>{info['seed']}</code>\n"
f"<b>Модель:</b> {info['model']}\n"
f"<b>Время хранения:</b> {ttl_hours} ч."
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🔄 Сгенерировать ещё", callback_data="gen_txt2img")],
[InlineKeyboardButton(text="📋 Главное меню", callback_data="main_menu")],
])
await message.answer_photo(
photo=FSInputFile(file_path),
caption=info_text,
parse_mode="HTML",
reply_markup=keyboard,
)
await status_msg.delete()
await state.clear()
+634
View File
@@ -0,0 +1,634 @@
"""Обработчики управления профилями."""
import logging
import math
from aiogram import Router, F
from aiogram.types import CallbackQuery, Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from database.database import db
from sd.sd_client import sd_client
router = Router()
logger = logging.getLogger(__name__)
# Константы для пагинации
ITEMS_PER_PAGE = 8
class ProfileCreateState(StatesGroup):
"""Состояния для создания профиля."""
waiting_for_name = State()
waiting_for_width = State()
waiting_for_height = State()
waiting_for_steps = State()
waiting_for_cfg = State()
waiting_for_sampler = State()
waiting_for_scheduler = State()
waiting_for_model = State()
waiting_for_lora = State()
waiting_for_lora_strength = State()
waiting_for_negative_prompt = State()
def _build_pagination_keyboard(items: list[str], current_page: int, callback_prefix: str, skip_callback: str) -> InlineKeyboardMarkup:
"""Построить клавиатуру с пагинацией."""
total_pages = math.ceil(len(items) / ITEMS_PER_PAGE) if items else 1
start = current_page * ITEMS_PER_PAGE
end = start + ITEMS_PER_PAGE
page_items = items[start:end]
keyboard = []
for item in page_items:
# Callback data ограничен 64 байтами
safe_data = item[:56]
keyboard.append([InlineKeyboardButton(
text=item[:30],
callback_data=f"{callback_prefix}_{safe_data}"
)])
# Навигация
nav_row = []
if current_page > 0:
nav_row.append(InlineKeyboardButton(text="⬅️", callback_data=f"{callback_prefix}_page_{current_page - 1}"))
nav_row.append(InlineKeyboardButton(text=f"{current_page + 1}/{total_pages}", callback_data="noop"))
if current_page < total_pages - 1:
nav_row.append(InlineKeyboardButton(text="➡️", callback_data=f"{callback_prefix}_page_{current_page + 1}"))
keyboard.append(nav_row)
keyboard.append([InlineKeyboardButton(text="⏭️ Пропустить", callback_data=skip_callback)])
return InlineKeyboardMarkup(inline_keyboard=keyboard)
@router.callback_query(F.data == "profiles_menu")
async def profiles_menu(callback: CallbackQuery):
"""Меню профилей."""
profiles = await db.get_user_profiles(callback.from_user.id)
if not profiles:
text = (
"📋 <b>Управление профилями</b>\n\n"
"У вас пока нет профилей. Создайте первый профиль для быстрой генерации."
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=" Создать профиль", callback_data="profile_create")],
[InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu")],
])
else:
text = "📋 <b>Ваши профили:</b>\n\n"
kb_buttons = []
for p in profiles:
default_mark = "" if p["is_default"] else ""
text += f"<b>{p['name']}</b>{default_mark}\n"
text += f" Размер: {p['width']}x{p['height']}, Шагов: {p['steps']}, CFG: {p['cfg_scale']}\n"
text += f" Сэмплер: {p['sampler']}, Шедулер: {p.get('scheduler', 'automatic')}\n"
if p.get("model"):
text += f" Модель: {p['model']}\n"
if p.get("lora"):
text += f" LoRA: {p['lora']} ({p['lora_strength']})\n"
text += "\n"
kb_buttons.append([
InlineKeyboardButton(
text=f"{'' if p['is_default'] else ''}{p['name']}",
callback_data=f"profile_view_{p['id']}"
)
])
kb_buttons.append([InlineKeyboardButton(text=" Создать профиль", callback_data="profile_create")])
kb_buttons.append([InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu")])
keyboard = InlineKeyboardMarkup(inline_keyboard=kb_buttons)
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "profile_create")
async def profile_create(callback: CallbackQuery, state: FSMContext):
"""Начать создание профиля."""
await state.set_state(ProfileCreateState.waiting_for_name)
await callback.message.edit_text(
"➕ <b>Создание нового профиля</b>\n\n"
"Введите <b>название</b> профиля:\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(ProfileCreateState.waiting_for_name)
async def profile_name(message: Message, state: FSMContext):
"""Обработка названия профиля."""
name = message.text.strip()
if len(name) < 2:
await message.answer("Название должно быть не менее 2 символов.")
return
await state.update_data(name=name)
await state.set_state(ProfileCreateState.waiting_for_width)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="512x512", callback_data="size_512_512"),
InlineKeyboardButton(text="768x768", callback_data="size_768_768"),
],
[
InlineKeyboardButton(text="512x768 (портрет)", callback_data="size_512_768"),
InlineKeyboardButton(text="768x512 (ландшафт)", callback_data="size_768_512"),
],
[
InlineKeyboardButton(text="1024x1024", callback_data="size_1024_1024"),
],
[
InlineKeyboardButton(text="✏️ Ввести свой размер", callback_data="size_custom"),
],
])
await message.answer(
"📐 <b>Выберите размер изображения:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("size_"), ProfileCreateState.waiting_for_width)
async def profile_size(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора размера."""
if callback.data == "size_custom":
await callback.message.edit_text(
"📐 <b>Введите ширину</b> (например, 512, 768, 1024):",
parse_mode="HTML",
)
return
parts = callback.data.replace("size_", "").split("_")
width, height = int(parts[0]), int(parts[1])
await state.update_data(width=width, height=height)
await state.set_state(ProfileCreateState.waiting_for_steps)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="20 (быстро)", callback_data="steps_20"),
InlineKeyboardButton(text="30 (качество)", callback_data="steps_30"),
],
[
InlineKeyboardButton(text="40 (высокое качество)", callback_data="steps_40"),
],
[
InlineKeyboardButton(text="✏️ Своё значение", callback_data="steps_custom"),
],
])
await callback.message.edit_text(
f"✅ Размер: {width}x{height}\n\n"
"🔢 <b>Выберите количество шагов:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.message(ProfileCreateState.waiting_for_width, F.text.isdigit())
async def profile_custom_width(message: Message, state: FSMContext):
"""Ввод пользовательской ширины."""
width = int(message.text.strip())
if width < 64 or width > 2048:
await message.answer("Размер должен быть от 64 до 2048 пикселей.")
return
await state.update_data(width=width)
await message.answer("📐 <b>Теперь введите высоту:</b>")
await state.set_state(ProfileCreateState.waiting_for_height)
@router.message(ProfileCreateState.waiting_for_height, F.text.isdigit())
async def profile_height(message: Message, state: FSMContext):
"""Обработка высоты."""
height = int(message.text.strip())
if height < 64 or height > 2048:
await message.answer("Размер должен быть от 64 до 2048 пикселей.")
return
await state.update_data(height=height)
await state.set_state(ProfileCreateState.waiting_for_steps)
data = await state.get_data()
width = data.get("width", "?")
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="20 (быстро)", callback_data="steps_20"),
InlineKeyboardButton(text="30 (качество)", callback_data="steps_30"),
],
[
InlineKeyboardButton(text="40 (высокое качество)", callback_data="steps_40"),
],
[
InlineKeyboardButton(text="✏️ Своё значение", callback_data="steps_custom"),
],
])
await message.answer(
f"✅ Размер: {width}x{height}\n\n"
"🔢 <b>Выберите количество шагов:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("steps_"), ProfileCreateState.waiting_for_steps)
async def profile_steps(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора шагов."""
if callback.data == "steps_custom":
await callback.message.edit_text(
"🔢 <b>Введите количество шагов</b> (10-150):",
parse_mode="HTML",
)
return
steps = int(callback.data.replace("steps_", ""))
await state.update_data(steps=steps)
await state.set_state(ProfileCreateState.waiting_for_cfg)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="5.0 (свободнее)", callback_data="cfg_5"),
InlineKeyboardButton(text="7.0 (стандарт)", callback_data="cfg_7"),
],
[
InlineKeyboardButton(text="9.0 (строже)", callback_data="cfg_9"),
InlineKeyboardButton(text="✏️ Своё значение", callback_data="cfg_custom"),
],
])
await callback.message.edit_text(
f"✅ Шагов: {steps}\n\n"
"🎚️ <b>Выберите CFG Scale:</b>\n"
"Определяет, насколько строго модель следует промпту.",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.message(ProfileCreateState.waiting_for_steps, F.text.isdigit())
async def profile_custom_steps(message: Message, state: FSMContext):
"""Ввод пользовательских шагов."""
steps = int(message.text.strip())
if steps < 10 or steps > 150:
await message.answer("Количество шагов должно быть от 10 до 150.")
return
await state.update_data(steps=steps)
await state.set_state(ProfileCreateState.waiting_for_cfg)
await message.answer(
"🎚️ <b>Введите CFG Scale</b> (обычно 5.0-12.0):\n"
"Отправьте <code>-</code> для стандартного значения (7.0).",
parse_mode="HTML",
)
@router.callback_query(F.data.startswith("cfg_"), ProfileCreateState.waiting_for_cfg)
async def profile_cfg(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора CFG Scale."""
if callback.data == "cfg_custom":
await callback.message.edit_text(
"🎚️ <b>Введите CFG Scale</b> (1.0-30.0):",
parse_mode="HTML",
)
return
cfg = float(callback.data.replace("cfg_", ""))
await state.update_data(cfg_scale=cfg)
await state.set_state(ProfileCreateState.waiting_for_sampler)
# Получаем сэмплеры из API
samplers = await sd_client.get_samplers()
if not samplers:
samplers = ["Euler a", "Euler", "DPM++ 2M Karras", "DPM++ SDE Karras", "DDIM"]
await state.update_data(available_samplers=samplers)
keyboard = _build_pagination_keyboard(samplers, 0, "sampler", "sampler_skip")
await callback.message.edit_text(
f"✅ CFG Scale: {cfg}\n\n"
"🔄 <b>Выберите сэмплер:</b>\n"
f"Доступно: {len(samplers)}",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.callback_query(F.data.startswith("sampler_page_"), ProfileCreateState.waiting_for_sampler)
async def profile_sampler_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы сэмплеров."""
page = int(callback.data.split("_")[-1])
samplers = (await state.get_data()).get("available_samplers", [])
keyboard = _build_pagination_keyboard(samplers, page, "sampler", "sampler_skip")
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "sampler_skip", ProfileCreateState.waiting_for_sampler)
async def profile_sampler_skip(callback: CallbackQuery, state: FSMContext):
"""Пропустить выбор сэмплера."""
await state.update_data(sampler="Euler a", scheduler="automatic")
await _show_model_selection(callback, state)
@router.callback_query(F.data.startswith("sampler_"), ProfileCreateState.waiting_for_sampler)
async def profile_sampler_select(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора сэмплера."""
sampler = callback.data.replace("sampler_", "")
await state.update_data(sampler=sampler, scheduler="automatic")
await _show_model_selection(callback, state)
async def _show_model_selection(callback: CallbackQuery, state: FSMContext):
"""Показать выбор модели."""
data = await state.get_data()
sampler = data.get("sampler", "Euler a")
await callback.answer("⏳ Загружаю список моделей...", show_alert=False)
models = await sd_client.get_models()
if not models:
models = ["— Не менять —"]
else:
models = ["— Не менять —"] + models
await state.update_data(available_models=models)
await state.set_state(ProfileCreateState.waiting_for_model)
keyboard = _build_pagination_keyboard(models, 0, "model", "model_skip")
await callback.message.edit_text(
f"✅ Сэмплер: <b>{sampler}</b>\n\n"
"🤖 <b>Выберите модель:</b>\n"
f"Доступно: {len(models) - 1}",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("model_page_"), ProfileCreateState.waiting_for_model)
async def profile_model_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы моделей."""
page = int(callback.data.split("_")[-1])
models = (await state.get_data()).get("available_models", [])
keyboard = _build_pagination_keyboard(models, page, "model", "model_skip")
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "model_skip", ProfileCreateState.waiting_for_model)
async def profile_model_skip(callback: CallbackQuery, state: FSMContext):
"""Пропустить выбор модели."""
await state.update_data(model="")
await _show_lora_selection(callback, state)
@router.callback_query(F.data.startswith("model_"), ProfileCreateState.waiting_for_model)
async def profile_model_select(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора модели."""
model = callback.data.replace("model_", "")
if model == "— Не менять —":
model = ""
await state.update_data(model=model)
await _show_lora_selection(callback, state)
async def _show_lora_selection(callback: CallbackQuery, state: FSMContext):
"""Показать выбор LoRA."""
data = await state.get_data()
model = data.get("model", "")
await callback.answer("⏳ Загружаю список LoRA...", show_alert=False)
loras = await sd_client.get_loras()
await state.update_data(available_loras=loras)
await state.set_state(ProfileCreateState.waiting_for_lora)
if not loras:
await state.update_data(lora="", lora_strength=0.8)
await _show_negative_prompt_request(callback, state)
return
loras = ["— Не использовать —"] + loras
keyboard = _build_pagination_keyboard(loras, 0, "lora", "lora_skip")
await callback.message.edit_text(
f"✅ Модель: <b>{model or 'текущая'}</b>\n\n"
"🎨 <b>Выберите LoRA:</b>\n"
f"Доступно: {len(loras) - 1}",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("lora_page_"), ProfileCreateState.waiting_for_lora)
async def profile_lora_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы LoRA."""
page = int(callback.data.split("_")[-1])
loras = (await state.get_data()).get("available_loras", [])
keyboard = _build_pagination_keyboard(loras, page, "lora", "lora_skip")
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "lora_skip", ProfileCreateState.waiting_for_lora)
async def profile_lora_skip(callback: CallbackQuery, state: FSMContext):
"""Пропустить выбор LoRA."""
await state.update_data(lora="", lora_strength=0.8)
await _show_negative_prompt_request(callback, state)
@router.callback_query(F.data.startswith("lora_"), ProfileCreateState.waiting_for_lora)
async def profile_lora_select(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора LoRA."""
lora = callback.data.replace("lora_", "")
if lora == "— Не использовать —":
await state.update_data(lora="", lora_strength=0.8)
await _show_negative_prompt_request(callback, state)
return
await state.update_data(lora=lora)
await state.set_state(ProfileCreateState.waiting_for_lora_strength)
await callback.message.edit_text(
f"✅ LoRA: <b>{lora}</b>\n\n"
"💪 <b>Введите силу LoRA</b> (0.0-1.0, обычно 0.8):\n"
"Отправьте <code>-</code> для стандартного значения (0.8).",
parse_mode="HTML",
)
@router.message(ProfileCreateState.waiting_for_lora_strength)
async def profile_lora_strength(message: Message, state: FSMContext):
"""Обработка силы LoRA."""
text = message.text.strip()
if text == "-":
lora_strength = 0.8
else:
try:
lora_strength = float(text)
if lora_strength < 0.0 or lora_strength > 1.0:
await message.answer("Сила LoRA должна быть от 0.0 до 1.0.")
return
except ValueError:
await message.answer("Введите корректное число (например, 0.8).")
return
await state.update_data(lora_strength=lora_strength)
await _show_negative_prompt_request_msg(message, state)
async def _show_negative_prompt_request(callback: CallbackQuery, state: FSMContext):
"""Запросить негативный промпт (из callback)."""
data = await state.get_data()
await state.set_state(ProfileCreateState.waiting_for_negative_prompt)
lora_text = data.get("lora", "")
lora_info = f"LoRA: {lora_text} ({data.get('lora_strength', 0.8)})" if lora_text else "LoRA: нет"
await callback.message.edit_text(
f"✅ Модель: <b>{data.get('model', '') or 'текущая'}</b>\n"
f"{lora_info}\n\n"
"🚫 <b>Введите негативный промпт</b> (необязательно)\n\n"
"Опишите, что НЕ должно быть на изображении.\n"
"Отправьте <code>-</code> чтобы пропустить.",
parse_mode="HTML",
)
async def _show_negative_prompt_request_msg(message: Message, state: FSMContext):
"""Запросить негативный промпт (из message)."""
data = await state.get_data()
await state.set_state(ProfileCreateState.waiting_for_negative_prompt)
lora_text = data.get("lora", "")
lora_info = f"LoRA: {lora_text} ({data.get('lora_strength', 0.8)})" if lora_text else "LoRA: нет"
await message.answer(
f"✅ Модель: <b>{data.get('model', '') or 'текущая'}</b>\n"
f"{lora_info}\n\n"
"🚫 <b>Введите негативный промпт</b> (необязательно)\n\n"
"Опишите, что НЕ должно быть на изображении.\n"
"Отправьте <code>-</code> чтобы пропустить.",
parse_mode="HTML",
)
@router.message(ProfileCreateState.waiting_for_negative_prompt)
async def profile_negative(message: Message, state: FSMContext):
"""Обработка негативного промпта и завершение создания профиля."""
negative_prompt = message.text.strip()
if negative_prompt == "-":
negative_prompt = ""
data = await state.get_data()
profile_id = await db.create_profile(
user_id=message.from_user.id,
name=data["name"],
width=data.get("width", 512),
height=data.get("height", 512),
steps=data.get("steps", 20),
cfg_scale=data.get("cfg_scale", 7.0),
sampler=data.get("sampler", "Euler a"),
scheduler=data.get("scheduler", "automatic"),
model=data.get("model", ""),
lora=data.get("lora", ""),
lora_strength=data.get("lora_strength", 0.8),
negative_prompt=negative_prompt,
is_default=False,
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="⭐ Сделать профилем по умолчанию", callback_data=f"profile_set_default_{profile_id}")],
[InlineKeyboardButton(text="📋 К списку профилей", callback_data="profiles_menu")],
])
await message.answer(
f"✅ <b>Профиль '{data['name']}' создан!</b>\n\n"
f"Размер: {data.get('width', 512)}x{data.get('height', 512)}\n"
f"Шагов: {data.get('steps', 20)}\n"
f"CFG Scale: {data.get('cfg_scale', 7.0)}\n"
f"Сэмплер: {data.get('sampler', 'Euler a')}\n"
f"Шедулер: {data.get('scheduler', 'automatic')}\n"
f"Модель: {data.get('model', 'текущая') or 'текущая'}\n"
f"LoRA: {data.get('lora', 'нет') or 'нет'} ({data.get('lora_strength', 0.8)})\n"
f"Негативный промпт: {negative_prompt or 'нет'}",
parse_mode="HTML",
reply_markup=keyboard,
)
await state.clear()
@router.callback_query(F.data.startswith("profile_view_"))
async def profile_view(callback: CallbackQuery):
"""Просмотр профиля."""
profile_id = int(callback.data.split("_")[-1])
profile = await db.get_profile(profile_id)
if not profile:
await callback.answer("Профиль не найден.", show_alert=True)
return
text = (
f"📋 <b>Профиль: {profile['name']}</b>\n\n"
f"{'⭐ По умолчанию' if profile['is_default'] else ''}\n"
f"Размер: {profile['width']}x{profile['height']}\n"
f"Шагов: {profile['steps']}\n"
f"CFG Scale: {profile['cfg_scale']}\n"
f"Сэмплер: {profile['sampler']}\n"
f"Шедулер: {profile.get('scheduler', 'automatic')}\n"
f"Модель: {profile['model'] or 'текущая'}\n"
f"LoRA: {profile['lora'] or 'нет'} ({profile['lora_strength']})\n"
f"Негативный промпт: {profile['negative_prompt'] or 'нет'}"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="⭐ По умолчанию", callback_data=f"profile_set_default_{profile_id}")
] if not profile['is_default'] else [
InlineKeyboardButton(text="⭐ Профиль по умолчанию", callback_data="noop")
],
[
InlineKeyboardButton(text="🗑️ Удалить", callback_data=f"profile_delete_{profile_id}"),
],
[
InlineKeyboardButton(text="⬅️ Назад", callback_data="profiles_menu"),
],
])
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data.startswith("profile_set_default_"))
async def profile_set_default(callback: CallbackQuery):
"""Установка профиля по умолчанию."""
profile_id = int(callback.data.split("_")[-1])
user_id = callback.from_user.id
await db.set_default_profile(user_id, profile_id)
await callback.answer("✅ Профиль установлен как профиль по умолчанию.")
await profile_view(callback)
@router.callback_query(F.data.startswith("profile_delete_"))
async def profile_delete(callback: CallbackQuery):
"""Удаление профиля."""
profile_id = int(callback.data.split("_")[-1])
user_id = callback.from_user.id
success = await db.delete_profile(profile_id, user_id)
if success:
await callback.answer("🗑️ Профиль удалён.")
await profiles_menu(callback)
else:
await callback.answer("❌ Ошибка удаления профиля.", show_alert=True)
+251
View File
@@ -0,0 +1,251 @@
"""Обработчики настроек хранения и вспомогательные команды."""
import logging
from aiogram import Router, F
from aiogram.types import CallbackQuery, Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.filters import Command
from config import settings
from database.database import db
from sd.sd_client import sd_client
from utils.image_manager import cleanup_expired_images
router = Router()
logger = logging.getLogger(__name__)
class StorageSettingsState(StatesGroup):
"""Состояния для настройки хранения."""
waiting_for_ttl = State()
# --- Обработчик главного меню ---
@router.callback_query(F.data == "main_menu")
async def main_menu(callback: CallbackQuery, state: FSMContext = None):
"""Возврат в главное меню."""
# Очищаем состояние, если оно есть
if state:
await state.clear()
from bot.handlers_start import get_main_keyboard
text = (
f"👋 <b>Привет, {callback.from_user.first_name}!</b>\n\n"
"Я бот для генерации изображений через Stable Diffusion.\n"
"Выберите действие из меню ниже:"
)
try:
await callback.message.edit_text(
text,
parse_mode="HTML",
reply_markup=get_main_keyboard(callback.from_user.id),
)
except Exception:
# Если edit_text не сработал (напр., сообщение с фото), отправляем новое
await callback.message.answer(
text,
parse_mode="HTML",
reply_markup=get_main_keyboard(callback.from_user.id),
)
await callback.answer()
# --- Проверка статуса SD API ---
@router.callback_query(F.data == "check_sd_status")
async def check_sd_status(callback: CallbackQuery):
"""Проверка соединения с SD API."""
status_msg = await callback.message.edit_text("🔍 Проверяю соединение с SD API...")
is_connected = await sd_client.check_connection()
if is_connected:
current_model = await sd_client.get_current_model()
text = (
f"✅ <b>SD API подключено!</b>\n\n"
f"Адрес: <code>{settings.SD_API_URL}</code>\n"
f"Текущая модель: <code>{current_model}</code>"
)
else:
text = (
f"❌ <b>SD API недоступно!</b>\n\n"
f"Адрес: <code>{settings.SD_API_URL}</code>\n\n"
"Проверьте:\n"
"1. Запущен ли Stable Diffusion WebUI\n"
"2. Доступен ли сервер по указанному адресу\n"
"3. Правильность настройки API (--api флаг)"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🔄 Повторить проверку", callback_data="check_sd_status")],
[InlineKeyboardButton(text="📋 Главное меню", callback_data="main_menu")],
])
await status_msg.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
# --- Помощь ---
@router.callback_query(F.data == "help")
async def help_command(callback: CallbackQuery):
"""Справка по боту."""
text = (
"️ <b>Справка по боту</b>\n\n"
"<b>Генерация изображений:</b>\n"
"• <b>txt2img</b> — создание изображения по текстовому описанию\n\n"
"<b>Профили:</b>\n"
"Создавайте профили с предустановленными настройками:\n"
"• Размер изображения\n"
"• Количество шагов\n"
"• CFG Scale\n"
"• Сэмплер\n"
"• Модель и LoRA\n\n"
"Вы можете установить профиль по умолчанию для быстрой генерации.\n\n"
"<b>Настройки хранения:</b>\n"
"Настройте время хранения сгенерированных изображений.\n"
"Изображения автоматически удаляются по истечении срока.\n\n"
"<b>Команды:</b>\n"
"/start — Главное меню\n"
"/menu — Показать главное меню\n"
"/cancel — Отменить текущую операцию\n"
"/status — Статус SD API"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="📋 Главное меню", callback_data="main_menu")],
])
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.message(Command("status"))
async def cmd_status(message: Message):
"""Команда /status."""
is_connected = await sd_client.check_connection()
if is_connected:
current_model = await sd_client.get_current_model()
text = (
f"✅ <b>SD API подключено!</b>\n\n"
f"Адрес: <code>{settings.SD_API_URL}</code>\n"
f"Текущая модель: <code>{current_model}</code>"
)
else:
text = (
f"❌ <b>SD API недоступно!</b>\n\n"
f"Адрес: <code>{settings.SD_API_URL}</code>"
)
await message.answer(text, parse_mode="HTML")
# --- Настройки хранения ---
@router.callback_query(F.data == "storage_settings")
async def storage_settings(callback: CallbackQuery):
"""Меню настроек хранения."""
user_settings = await db.get_user_settings(callback.from_user.id)
current_ttl = user_settings.get("image_ttl_hours", settings.DEFAULT_IMAGE_TTL_HOURS)
text = (
"🗃️ <b>Настройки хранения изображений</b>\n\n"
f"Текущее время хранения: <b>{current_ttl} часов</b>\n"
f"Максимальное время: <b>{settings.MAX_IMAGE_TTL_HOURS} часов</b>\n\n"
"Изображения автоматически удаляются по истечении срока.\n"
"Очистка происходит каждые {interval} минут.".format(interval=settings.CLEANUP_INTERVAL_MINUTES)
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="12 часов", callback_data="set_ttl_12")],
[InlineKeyboardButton(text="24 часа (1 день)", callback_data="set_ttl_24")],
[InlineKeyboardButton(text="48 часов (2 дня)", callback_data="set_ttl_48")],
[InlineKeyboardButton(text="72 часа (3 дня)", callback_data="set_ttl_72")],
[InlineKeyboardButton(text="168 часов (7 дней)", callback_data="set_ttl_168")],
[InlineKeyboardButton(text="✏️ Ввести своё значение", callback_data="set_ttl_custom")],
[InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu")],
])
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data.startswith("set_ttl_"))
async def set_ttl(callback: CallbackQuery, state: FSMContext):
"""Установка времени хранения."""
if callback.data == "set_ttl_custom":
await state.set_state(StorageSettingsState.waiting_for_ttl)
await callback.message.edit_text(
f"⏰ <b>Введите время хранения в часах</b>\n\n"
f"Допустимый диапазон: 1 - {settings.MAX_IMAGE_TTL_HOURS} часов.",
parse_mode="HTML",
)
await callback.answer()
return
ttl_hours = int(callback.data.replace("set_ttl_", ""))
await _save_ttl(callback, ttl_hours)
@router.message(StorageSettingsState.waiting_for_ttl)
async def save_custom_ttl(message: Message, state: FSMContext):
"""Сохранение пользовательского значения TTL."""
try:
ttl_hours = int(message.text.strip())
if ttl_hours < 1 or ttl_hours > settings.MAX_IMAGE_TTL_HOURS:
await message.answer(
f"Значение должно быть от 1 до {settings.MAX_IMAGE_TTL_HOURS} часов."
)
return
except ValueError:
await message.answer("Введите корректное целое число.")
return
await state.clear()
await _save_ttl(message, ttl_hours)
async def _save_ttl(target, ttl_hours: int):
"""Сохранить TTL и показать подтверждение."""
user_id = target.from_user.id
await db.update_user_settings(user_id, image_ttl_hours=ttl_hours)
text = (
f"✅ <b>Настройки сохранены!</b>\n\n"
f"Время хранения изображений: <b>{ttl_hours} часов</b>\n"
f"Изображения будут автоматически удаляться по истечении этого срока."
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🗃️ Другие настройки хранения", callback_data="storage_settings")],
[InlineKeyboardButton(text="📋 Главное меню", callback_data="main_menu")],
])
if hasattr(target, 'message'):
await target.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
else:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
# --- Команда /cancel ---
@router.message(Command("cancel"))
async def cmd_cancel(message: Message, state: FSMContext):
"""Отмена текущей операции."""
current_state = await state.get_state()
if current_state is None:
await message.answer("Нет активных операций.")
return
await state.clear()
await message.answer("❌ Операция отменена.")
from bot.handlers_start import get_main_keyboard
await message.answer("📋 Главное меню:", reply_markup=get_main_keyboard(message.from_user.id))
# --- Периодическая очистка ---
async def scheduled_cleanup():
"""Функция для периодической очистки просроченных изображений."""
try:
deleted = await cleanup_expired_images()
if deleted > 0:
logger.info(f"Очистка: удалено {deleted} изображений")
except Exception as e:
logger.error(f"Ошибка при очистке изображений: {e}")
+51
View File
@@ -0,0 +1,51 @@
"""Обработчик команды /start и главного меню."""
from aiogram import Router
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from config import settings
router = Router()
def get_main_keyboard(user_id: int = None) -> InlineKeyboardMarkup:
"""Создать главное меню."""
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="🎨 Генерация (txt2img)", callback_data="gen_txt2img"),
],
[
InlineKeyboardButton(text="⚙️ Профили", callback_data="profiles_menu"),
InlineKeyboardButton(text="🗑️ Настройки хранения", callback_data="storage_settings"),
],
[
InlineKeyboardButton(text="📊 Статус SD API", callback_data="check_sd_status"),
InlineKeyboardButton(text="️ Помощь", callback_data="help"),
],
])
# Добавляем кнопку админ-панели для админа
if user_id and user_id == settings.ADMIN_ID:
keyboard.inline_keyboard.insert(0, [
InlineKeyboardButton(text="🛡️ Админ-панель", callback_data="admin_panel"),
])
return keyboard
@router.message(CommandStart())
async def cmd_start(message: Message):
"""Обработка команды /start."""
text = (
f"👋 <b>Привет, {message.from_user.first_name}!</b>\n\n"
"Я бот для генерации изображений через Stable Diffusion.\n"
"Выберите действие из меню ниже:"
)
await message.answer(text, reply_markup=get_main_keyboard(message.from_user.id), parse_mode="HTML")
@router.message(Command("menu"))
async def cmd_menu(message: Message):
"""Обработка команды /menu."""
await message.answer("📋 Главное меню:", reply_markup=get_main_keyboard(message.from_user.id))
+102
View File
@@ -0,0 +1,102 @@
"""Мидлварь для проверки доступа пользователей к боту."""
import logging
from aiogram import BaseMiddleware
from aiogram.types import Message, CallbackQuery
from aiogram.filters import Command
from database.database import db
logger = logging.getLogger(__name__)
class AccessCheckMiddleware(BaseMiddleware):
"""Мидлварь проверяет, имеет ли пользователь доступ к боту."""
# Команды, которые всегда разрешены (без проверки доступа)
ALLOWED_COMMANDS = {"start", "menu"}
async def __call__(self, handler, event, data):
# Определяем тип события и извлекаем user_id
user_id = None
username = None
display_name = None
if isinstance(event, Message):
user_id = event.from_user.id
username = event.from_user.username
display_name = event.from_user.full_name
# Проверяем, является ли это разрешённой командой
if isinstance(event, Message) and event.text:
text = event.text.strip().lower()
for cmd in self.ALLOWED_COMMANDS:
# Проверяем /cmd и /cmd@username
if text == f"/{cmd}" or text.startswith(f"/{cmd}@"):
return await handler(event, data)
elif isinstance(event, CallbackQuery):
user_id = event.from_user.id
username = event.from_user.username
display_name = event.from_user.full_name
if user_id is None:
return await handler(event, data)
# Проверяем доступ
access_info = await db.check_user_access(user_id)
if access_info["has_access"]:
# Сохраняем информацию о пользователе в data для обработчиков
data["user_access"] = access_info
return await handler(event, data)
# Доступ запрещён — отправляем сообщение
reason = access_info["reason"]
user_info = access_info["user_info"]
# Формируем сообщение в зависимости от причины
if reason == "not_registered":
text = (
"🔒 <b>Доступ запрещён</b>\n\n"
"У вас нет доступа к этому боту. "
"Обратитесь к администратору для получения доступа."
)
elif reason == "deactivated":
text = (
"🔒 <b>Доступ заблокирован</b>\n\n"
"Ваш доступ к боту был отозван администратором."
)
elif reason == "expired":
expires_str = "не указан"
if user_info and user_info.get("access_expires_at"):
from datetime import datetime
try:
exp_date = datetime.fromisoformat(user_info["access_expires_at"])
expires_str = exp_date.strftime("%d.%m.%Y %H:%M")
except Exception:
pass
text = (
"⏰ <b>Срок доступа истёк</b>\n\n"
f"Ваш доступ к боту истёк ({expires_str}).\n"
"Обратитесь к администратору для продления."
)
elif reason == "generations_limit":
used = user_info.get("used_generations", "?") if user_info else "?"
max_gens = user_info.get("max_generations", "?") if user_info else "?"
text = (
"🎨 <b>Лимит генераций исчерпан</b>\n\n"
f"Вы использовали все {max_gens} генераций.\n"
"Обратитесь к администратору для увеличения лимита."
)
else:
text = "🔒 <b>Доступ запрещён</b>"
if isinstance(event, Message):
await event.answer(text, parse_mode="HTML")
elif isinstance(event, CallbackQuery):
await event.answer("Доступ запрещён", show_alert=True)
try:
await event.message.answer(text, parse_mode="HTML")
except Exception:
pass
return None