Files
tg-sd/bot/handlers_profiles.py
dinlo b88ccf3b4b Initial commit
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 18:46:09 +08:00

635 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Обработчики управления профилями."""
import logging
import math
from aiogram import Router, F
from aiogram.types import CallbackQuery, Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from database.database import db
from sd.sd_client import sd_client
router = Router()
logger = logging.getLogger(__name__)
# Константы для пагинации
ITEMS_PER_PAGE = 8
class ProfileCreateState(StatesGroup):
"""Состояния для создания профиля."""
waiting_for_name = State()
waiting_for_width = State()
waiting_for_height = State()
waiting_for_steps = State()
waiting_for_cfg = State()
waiting_for_sampler = State()
waiting_for_scheduler = State()
waiting_for_model = State()
waiting_for_lora = State()
waiting_for_lora_strength = State()
waiting_for_negative_prompt = State()
def _build_pagination_keyboard(items: list[str], current_page: int, callback_prefix: str, skip_callback: str) -> InlineKeyboardMarkup:
"""Построить клавиатуру с пагинацией."""
total_pages = math.ceil(len(items) / ITEMS_PER_PAGE) if items else 1
start = current_page * ITEMS_PER_PAGE
end = start + ITEMS_PER_PAGE
page_items = items[start:end]
keyboard = []
for item in page_items:
# Callback data ограничен 64 байтами
safe_data = item[:56]
keyboard.append([InlineKeyboardButton(
text=item[:30],
callback_data=f"{callback_prefix}_{safe_data}"
)])
# Навигация
nav_row = []
if current_page > 0:
nav_row.append(InlineKeyboardButton(text="⬅️", callback_data=f"{callback_prefix}_page_{current_page - 1}"))
nav_row.append(InlineKeyboardButton(text=f"{current_page + 1}/{total_pages}", callback_data="noop"))
if current_page < total_pages - 1:
nav_row.append(InlineKeyboardButton(text="➡️", callback_data=f"{callback_prefix}_page_{current_page + 1}"))
keyboard.append(nav_row)
keyboard.append([InlineKeyboardButton(text="⏭️ Пропустить", callback_data=skip_callback)])
return InlineKeyboardMarkup(inline_keyboard=keyboard)
@router.callback_query(F.data == "profiles_menu")
async def profiles_menu(callback: CallbackQuery):
"""Меню профилей."""
profiles = await db.get_user_profiles(callback.from_user.id)
if not profiles:
text = (
"📋 <b>Управление профилями</b>\n\n"
"У вас пока нет профилей. Создайте первый профиль для быстрой генерации."
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=" Создать профиль", callback_data="profile_create")],
[InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu")],
])
else:
text = "📋 <b>Ваши профили:</b>\n\n"
kb_buttons = []
for p in profiles:
default_mark = "" if p["is_default"] else ""
text += f"<b>{p['name']}</b>{default_mark}\n"
text += f" Размер: {p['width']}x{p['height']}, Шагов: {p['steps']}, CFG: {p['cfg_scale']}\n"
text += f" Сэмплер: {p['sampler']}, Шедулер: {p.get('scheduler', 'automatic')}\n"
if p.get("model"):
text += f" Модель: {p['model']}\n"
if p.get("lora"):
text += f" LoRA: {p['lora']} ({p['lora_strength']})\n"
text += "\n"
kb_buttons.append([
InlineKeyboardButton(
text=f"{'' if p['is_default'] else ''}{p['name']}",
callback_data=f"profile_view_{p['id']}"
)
])
kb_buttons.append([InlineKeyboardButton(text=" Создать профиль", callback_data="profile_create")])
kb_buttons.append([InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu")])
keyboard = InlineKeyboardMarkup(inline_keyboard=kb_buttons)
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "profile_create")
async def profile_create(callback: CallbackQuery, state: FSMContext):
"""Начать создание профиля."""
await state.set_state(ProfileCreateState.waiting_for_name)
await callback.message.edit_text(
"➕ <b>Создание нового профиля</b>\n\n"
"Введите <b>название</b> профиля:\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(ProfileCreateState.waiting_for_name)
async def profile_name(message: Message, state: FSMContext):
"""Обработка названия профиля."""
name = message.text.strip()
if len(name) < 2:
await message.answer("Название должно быть не менее 2 символов.")
return
await state.update_data(name=name)
await state.set_state(ProfileCreateState.waiting_for_width)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="512x512", callback_data="size_512_512"),
InlineKeyboardButton(text="768x768", callback_data="size_768_768"),
],
[
InlineKeyboardButton(text="512x768 (портрет)", callback_data="size_512_768"),
InlineKeyboardButton(text="768x512 (ландшафт)", callback_data="size_768_512"),
],
[
InlineKeyboardButton(text="1024x1024", callback_data="size_1024_1024"),
],
[
InlineKeyboardButton(text="✏️ Ввести свой размер", callback_data="size_custom"),
],
])
await message.answer(
"📐 <b>Выберите размер изображения:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("size_"), ProfileCreateState.waiting_for_width)
async def profile_size(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора размера."""
if callback.data == "size_custom":
await callback.message.edit_text(
"📐 <b>Введите ширину</b> (например, 512, 768, 1024):",
parse_mode="HTML",
)
return
parts = callback.data.replace("size_", "").split("_")
width, height = int(parts[0]), int(parts[1])
await state.update_data(width=width, height=height)
await state.set_state(ProfileCreateState.waiting_for_steps)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="20 (быстро)", callback_data="steps_20"),
InlineKeyboardButton(text="30 (качество)", callback_data="steps_30"),
],
[
InlineKeyboardButton(text="40 (высокое качество)", callback_data="steps_40"),
],
[
InlineKeyboardButton(text="✏️ Своё значение", callback_data="steps_custom"),
],
])
await callback.message.edit_text(
f"✅ Размер: {width}x{height}\n\n"
"🔢 <b>Выберите количество шагов:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.message(ProfileCreateState.waiting_for_width, F.text.isdigit())
async def profile_custom_width(message: Message, state: FSMContext):
"""Ввод пользовательской ширины."""
width = int(message.text.strip())
if width < 64 or width > 2048:
await message.answer("Размер должен быть от 64 до 2048 пикселей.")
return
await state.update_data(width=width)
await message.answer("📐 <b>Теперь введите высоту:</b>")
await state.set_state(ProfileCreateState.waiting_for_height)
@router.message(ProfileCreateState.waiting_for_height, F.text.isdigit())
async def profile_height(message: Message, state: FSMContext):
"""Обработка высоты."""
height = int(message.text.strip())
if height < 64 or height > 2048:
await message.answer("Размер должен быть от 64 до 2048 пикселей.")
return
await state.update_data(height=height)
await state.set_state(ProfileCreateState.waiting_for_steps)
data = await state.get_data()
width = data.get("width", "?")
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="20 (быстро)", callback_data="steps_20"),
InlineKeyboardButton(text="30 (качество)", callback_data="steps_30"),
],
[
InlineKeyboardButton(text="40 (высокое качество)", callback_data="steps_40"),
],
[
InlineKeyboardButton(text="✏️ Своё значение", callback_data="steps_custom"),
],
])
await message.answer(
f"✅ Размер: {width}x{height}\n\n"
"🔢 <b>Выберите количество шагов:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("steps_"), ProfileCreateState.waiting_for_steps)
async def profile_steps(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора шагов."""
if callback.data == "steps_custom":
await callback.message.edit_text(
"🔢 <b>Введите количество шагов</b> (10-150):",
parse_mode="HTML",
)
return
steps = int(callback.data.replace("steps_", ""))
await state.update_data(steps=steps)
await state.set_state(ProfileCreateState.waiting_for_cfg)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="5.0 (свободнее)", callback_data="cfg_5"),
InlineKeyboardButton(text="7.0 (стандарт)", callback_data="cfg_7"),
],
[
InlineKeyboardButton(text="9.0 (строже)", callback_data="cfg_9"),
InlineKeyboardButton(text="✏️ Своё значение", callback_data="cfg_custom"),
],
])
await callback.message.edit_text(
f"✅ Шагов: {steps}\n\n"
"🎚️ <b>Выберите CFG Scale:</b>\n"
"Определяет, насколько строго модель следует промпту.",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.message(ProfileCreateState.waiting_for_steps, F.text.isdigit())
async def profile_custom_steps(message: Message, state: FSMContext):
"""Ввод пользовательских шагов."""
steps = int(message.text.strip())
if steps < 10 or steps > 150:
await message.answer("Количество шагов должно быть от 10 до 150.")
return
await state.update_data(steps=steps)
await state.set_state(ProfileCreateState.waiting_for_cfg)
await message.answer(
"🎚️ <b>Введите CFG Scale</b> (обычно 5.0-12.0):\n"
"Отправьте <code>-</code> для стандартного значения (7.0).",
parse_mode="HTML",
)
@router.callback_query(F.data.startswith("cfg_"), ProfileCreateState.waiting_for_cfg)
async def profile_cfg(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора CFG Scale."""
if callback.data == "cfg_custom":
await callback.message.edit_text(
"🎚️ <b>Введите CFG Scale</b> (1.0-30.0):",
parse_mode="HTML",
)
return
cfg = float(callback.data.replace("cfg_", ""))
await state.update_data(cfg_scale=cfg)
await state.set_state(ProfileCreateState.waiting_for_sampler)
# Получаем сэмплеры из API
samplers = await sd_client.get_samplers()
if not samplers:
samplers = ["Euler a", "Euler", "DPM++ 2M Karras", "DPM++ SDE Karras", "DDIM"]
await state.update_data(available_samplers=samplers)
keyboard = _build_pagination_keyboard(samplers, 0, "sampler", "sampler_skip")
await callback.message.edit_text(
f"✅ CFG Scale: {cfg}\n\n"
"🔄 <b>Выберите сэмплер:</b>\n"
f"Доступно: {len(samplers)}",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.callback_query(F.data.startswith("sampler_page_"), ProfileCreateState.waiting_for_sampler)
async def profile_sampler_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы сэмплеров."""
page = int(callback.data.split("_")[-1])
samplers = (await state.get_data()).get("available_samplers", [])
keyboard = _build_pagination_keyboard(samplers, page, "sampler", "sampler_skip")
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "sampler_skip", ProfileCreateState.waiting_for_sampler)
async def profile_sampler_skip(callback: CallbackQuery, state: FSMContext):
"""Пропустить выбор сэмплера."""
await state.update_data(sampler="Euler a", scheduler="automatic")
await _show_model_selection(callback, state)
@router.callback_query(F.data.startswith("sampler_"), ProfileCreateState.waiting_for_sampler)
async def profile_sampler_select(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора сэмплера."""
sampler = callback.data.replace("sampler_", "")
await state.update_data(sampler=sampler, scheduler="automatic")
await _show_model_selection(callback, state)
async def _show_model_selection(callback: CallbackQuery, state: FSMContext):
"""Показать выбор модели."""
data = await state.get_data()
sampler = data.get("sampler", "Euler a")
await callback.answer("⏳ Загружаю список моделей...", show_alert=False)
models = await sd_client.get_models()
if not models:
models = ["— Не менять —"]
else:
models = ["— Не менять —"] + models
await state.update_data(available_models=models)
await state.set_state(ProfileCreateState.waiting_for_model)
keyboard = _build_pagination_keyboard(models, 0, "model", "model_skip")
await callback.message.edit_text(
f"✅ Сэмплер: <b>{sampler}</b>\n\n"
"🤖 <b>Выберите модель:</b>\n"
f"Доступно: {len(models) - 1}",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("model_page_"), ProfileCreateState.waiting_for_model)
async def profile_model_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы моделей."""
page = int(callback.data.split("_")[-1])
models = (await state.get_data()).get("available_models", [])
keyboard = _build_pagination_keyboard(models, page, "model", "model_skip")
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "model_skip", ProfileCreateState.waiting_for_model)
async def profile_model_skip(callback: CallbackQuery, state: FSMContext):
"""Пропустить выбор модели."""
await state.update_data(model="")
await _show_lora_selection(callback, state)
@router.callback_query(F.data.startswith("model_"), ProfileCreateState.waiting_for_model)
async def profile_model_select(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора модели."""
model = callback.data.replace("model_", "")
if model == "— Не менять —":
model = ""
await state.update_data(model=model)
await _show_lora_selection(callback, state)
async def _show_lora_selection(callback: CallbackQuery, state: FSMContext):
"""Показать выбор LoRA."""
data = await state.get_data()
model = data.get("model", "")
await callback.answer("⏳ Загружаю список LoRA...", show_alert=False)
loras = await sd_client.get_loras()
await state.update_data(available_loras=loras)
await state.set_state(ProfileCreateState.waiting_for_lora)
if not loras:
await state.update_data(lora="", lora_strength=0.8)
await _show_negative_prompt_request(callback, state)
return
loras = ["— Не использовать —"] + loras
keyboard = _build_pagination_keyboard(loras, 0, "lora", "lora_skip")
await callback.message.edit_text(
f"✅ Модель: <b>{model or 'текущая'}</b>\n\n"
"🎨 <b>Выберите LoRA:</b>\n"
f"Доступно: {len(loras) - 1}",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("lora_page_"), ProfileCreateState.waiting_for_lora)
async def profile_lora_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы LoRA."""
page = int(callback.data.split("_")[-1])
loras = (await state.get_data()).get("available_loras", [])
keyboard = _build_pagination_keyboard(loras, page, "lora", "lora_skip")
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data == "lora_skip", ProfileCreateState.waiting_for_lora)
async def profile_lora_skip(callback: CallbackQuery, state: FSMContext):
"""Пропустить выбор LoRA."""
await state.update_data(lora="", lora_strength=0.8)
await _show_negative_prompt_request(callback, state)
@router.callback_query(F.data.startswith("lora_"), ProfileCreateState.waiting_for_lora)
async def profile_lora_select(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора LoRA."""
lora = callback.data.replace("lora_", "")
if lora == "— Не использовать —":
await state.update_data(lora="", lora_strength=0.8)
await _show_negative_prompt_request(callback, state)
return
await state.update_data(lora=lora)
await state.set_state(ProfileCreateState.waiting_for_lora_strength)
await callback.message.edit_text(
f"✅ LoRA: <b>{lora}</b>\n\n"
"💪 <b>Введите силу LoRA</b> (0.0-1.0, обычно 0.8):\n"
"Отправьте <code>-</code> для стандартного значения (0.8).",
parse_mode="HTML",
)
@router.message(ProfileCreateState.waiting_for_lora_strength)
async def profile_lora_strength(message: Message, state: FSMContext):
"""Обработка силы LoRA."""
text = message.text.strip()
if text == "-":
lora_strength = 0.8
else:
try:
lora_strength = float(text)
if lora_strength < 0.0 or lora_strength > 1.0:
await message.answer("Сила LoRA должна быть от 0.0 до 1.0.")
return
except ValueError:
await message.answer("Введите корректное число (например, 0.8).")
return
await state.update_data(lora_strength=lora_strength)
await _show_negative_prompt_request_msg(message, state)
async def _show_negative_prompt_request(callback: CallbackQuery, state: FSMContext):
"""Запросить негативный промпт (из callback)."""
data = await state.get_data()
await state.set_state(ProfileCreateState.waiting_for_negative_prompt)
lora_text = data.get("lora", "")
lora_info = f"LoRA: {lora_text} ({data.get('lora_strength', 0.8)})" if lora_text else "LoRA: нет"
await callback.message.edit_text(
f"✅ Модель: <b>{data.get('model', '') or 'текущая'}</b>\n"
f"{lora_info}\n\n"
"🚫 <b>Введите негативный промпт</b> (необязательно)\n\n"
"Опишите, что НЕ должно быть на изображении.\n"
"Отправьте <code>-</code> чтобы пропустить.",
parse_mode="HTML",
)
async def _show_negative_prompt_request_msg(message: Message, state: FSMContext):
"""Запросить негативный промпт (из message)."""
data = await state.get_data()
await state.set_state(ProfileCreateState.waiting_for_negative_prompt)
lora_text = data.get("lora", "")
lora_info = f"LoRA: {lora_text} ({data.get('lora_strength', 0.8)})" if lora_text else "LoRA: нет"
await message.answer(
f"✅ Модель: <b>{data.get('model', '') or 'текущая'}</b>\n"
f"{lora_info}\n\n"
"🚫 <b>Введите негативный промпт</b> (необязательно)\n\n"
"Опишите, что НЕ должно быть на изображении.\n"
"Отправьте <code>-</code> чтобы пропустить.",
parse_mode="HTML",
)
@router.message(ProfileCreateState.waiting_for_negative_prompt)
async def profile_negative(message: Message, state: FSMContext):
"""Обработка негативного промпта и завершение создания профиля."""
negative_prompt = message.text.strip()
if negative_prompt == "-":
negative_prompt = ""
data = await state.get_data()
profile_id = await db.create_profile(
user_id=message.from_user.id,
name=data["name"],
width=data.get("width", 512),
height=data.get("height", 512),
steps=data.get("steps", 20),
cfg_scale=data.get("cfg_scale", 7.0),
sampler=data.get("sampler", "Euler a"),
scheduler=data.get("scheduler", "automatic"),
model=data.get("model", ""),
lora=data.get("lora", ""),
lora_strength=data.get("lora_strength", 0.8),
negative_prompt=negative_prompt,
is_default=False,
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="⭐ Сделать профилем по умолчанию", callback_data=f"profile_set_default_{profile_id}")],
[InlineKeyboardButton(text="📋 К списку профилей", callback_data="profiles_menu")],
])
await message.answer(
f"✅ <b>Профиль '{data['name']}' создан!</b>\n\n"
f"Размер: {data.get('width', 512)}x{data.get('height', 512)}\n"
f"Шагов: {data.get('steps', 20)}\n"
f"CFG Scale: {data.get('cfg_scale', 7.0)}\n"
f"Сэмплер: {data.get('sampler', 'Euler a')}\n"
f"Шедулер: {data.get('scheduler', 'automatic')}\n"
f"Модель: {data.get('model', 'текущая') or 'текущая'}\n"
f"LoRA: {data.get('lora', 'нет') or 'нет'} ({data.get('lora_strength', 0.8)})\n"
f"Негативный промпт: {negative_prompt or 'нет'}",
parse_mode="HTML",
reply_markup=keyboard,
)
await state.clear()
@router.callback_query(F.data.startswith("profile_view_"))
async def profile_view(callback: CallbackQuery):
"""Просмотр профиля."""
profile_id = int(callback.data.split("_")[-1])
profile = await db.get_profile(profile_id)
if not profile:
await callback.answer("Профиль не найден.", show_alert=True)
return
text = (
f"📋 <b>Профиль: {profile['name']}</b>\n\n"
f"{'⭐ По умолчанию' if profile['is_default'] else ''}\n"
f"Размер: {profile['width']}x{profile['height']}\n"
f"Шагов: {profile['steps']}\n"
f"CFG Scale: {profile['cfg_scale']}\n"
f"Сэмплер: {profile['sampler']}\n"
f"Шедулер: {profile.get('scheduler', 'automatic')}\n"
f"Модель: {profile['model'] or 'текущая'}\n"
f"LoRA: {profile['lora'] or 'нет'} ({profile['lora_strength']})\n"
f"Негативный промпт: {profile['negative_prompt'] or 'нет'}"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="⭐ По умолчанию", callback_data=f"profile_set_default_{profile_id}")
] if not profile['is_default'] else [
InlineKeyboardButton(text="⭐ Профиль по умолчанию", callback_data="noop")
],
[
InlineKeyboardButton(text="🗑️ Удалить", callback_data=f"profile_delete_{profile_id}"),
],
[
InlineKeyboardButton(text="⬅️ Назад", callback_data="profiles_menu"),
],
])
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data.startswith("profile_set_default_"))
async def profile_set_default(callback: CallbackQuery):
"""Установка профиля по умолчанию."""
profile_id = int(callback.data.split("_")[-1])
user_id = callback.from_user.id
await db.set_default_profile(user_id, profile_id)
await callback.answer("✅ Профиль установлен как профиль по умолчанию.")
await profile_view(callback)
@router.callback_query(F.data.startswith("profile_delete_"))
async def profile_delete(callback: CallbackQuery):
"""Удаление профиля."""
profile_id = int(callback.data.split("_")[-1])
user_id = callback.from_user.id
success = await db.delete_profile(profile_id, user_id)
if success:
await callback.answer("🗑️ Профиль удалён.")
await profiles_menu(callback)
else:
await callback.answer("❌ Ошибка удаления профиля.", show_alert=True)