Files

242 lines
9.6 KiB
Python
Raw Permalink Normal View History

2026-05-31 18:46:09 +08:00
"""Обработчики генерации изображений (txt2img)."""
import logging
import os
import uuid
from datetime import datetime
from html import escape
from aiogram import Router, F
from aiogram.types import (
Message,
CallbackQuery,
InlineKeyboardMarkup,
InlineKeyboardButton,
FSInputFile,
)
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from config import settings
from database.database import db
from sd.sd_client import sd_client
from utils.image_manager import get_image_path
router = Router()
logger = logging.getLogger(__name__)
class Txt2ImgState(StatesGroup):
"""Состояния для процесса txt2img."""
waiting_for_prompt = State()
waiting_for_negative_prompt = State()
generating = State()
def get_generation_keyboard() -> InlineKeyboardMarkup:
"""Клавиатура для генерации txt2img."""
return InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="📝 Ввести промпт", callback_data="txt2img_enter_prompt"),
],
[
InlineKeyboardButton(text="📋 Использовать профиль", callback_data="txt2img_use_profile"),
],
[
InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu"),
],
])
@router.callback_query(F.data == "gen_txt2img")
async def start_txt2img(callback: CallbackQuery, state: FSMContext):
"""Начать процесс txt2img."""
await state.clear()
await callback.message.answer(
"🎨 <b>Генерация изображения (txt2img)</b>\n\n"
"Введите описание изображения (промпт).\n"
"Можете использовать <code>/cancel</code> для отмены.",
parse_mode="HTML",
reply_markup=get_generation_keyboard(),
)
await callback.answer()
@router.callback_query(F.data == "txt2img_enter_prompt")
async def enter_prompt_txt(callback: CallbackQuery, state: FSMContext):
"""Ввод промпта для txt2img."""
await state.set_state(Txt2ImgState.waiting_for_prompt)
await callback.message.edit_text(
"📝 <b>Введите промпт:</b>\n\n"
"Опишите изображение, которое хотите сгенерировать.\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.callback_query(F.data == "txt2img_use_profile")
async def use_profile_txt(callback: CallbackQuery, state: FSMContext):
"""Использовать профиль для txt2img."""
profiles = await db.get_user_profiles(callback.from_user.id)
if not profiles:
await callback.answer("У вас пока нет профилей. Создайте профиль в разделе 'Профили'.", show_alert=True)
return
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=p["name"], callback_data=f"txt2img_profile_{p['id']}")]
for p in profiles
])
keyboard.inline_keyboard.append([InlineKeyboardButton(text="⬅️ Назад", callback_data="gen_txt2img")])
await state.set_state(Txt2ImgState.waiting_for_prompt)
await callback.message.edit_text(
"📋 <b>Выберите профиль:</b>\n\n"
"После выбора будет запрошен промпт.",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
@router.callback_query(F.data.startswith("txt2img_profile_"))
async def select_profile_txt(callback: CallbackQuery, state: FSMContext):
"""Выбор профиля для txt2img."""
profile_id = int(callback.data.split("_")[-1])
await state.update_data(profile_id=profile_id)
profile = await db.get_profile(profile_id)
await state.set_state(Txt2ImgState.waiting_for_prompt)
await callback.message.edit_text(
f"✅ Профиль <b>{profile['name']}</b> выбран.\n\n"
f"Параметры: {profile['width']}x{profile['height']}, "
f"шагов: {profile['steps']}, CFG: {profile['cfg_scale']}, "
f"сэмплер: {profile['sampler']}, шедулер: {profile.get('scheduler', 'automatic')}\n\n"
f"📝 <b>Введите промпт:</b>\n"
f"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(Txt2ImgState.waiting_for_prompt, F.text)
async def process_prompt_txt(message: Message, state: FSMContext):
"""Обработка введённого промпта для txt2img."""
prompt = message.text.strip()
if not prompt:
await message.answer("Промпт не может быть пустым. Введите описание изображения.")
return
await state.update_data(prompt=prompt)
# Запрашиваем негативный промпт
await state.set_state(Txt2ImgState.waiting_for_negative_prompt)
await message.answer(
"🚫 <b>Введите негативный промпт</b> (необязательно)\n\n"
"Опишите, что НЕ должно быть на изображении.\n"
"Отправьте <code>-</code> чтобы пропустить.\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
@router.message(Txt2ImgState.waiting_for_negative_prompt)
async def process_negative_prompt_txt(message: Message, state: FSMContext):
"""Обработка негативного промпта для txt2img."""
negative_prompt = message.text.strip()
if negative_prompt == "-":
negative_prompt = ""
await state.update_data(negative_prompt=negative_prompt)
await state.set_state(Txt2ImgState.generating)
# Получаем данные
data = await state.get_data()
prompt = data.get("prompt", "")
profile_id = data.get("profile_id")
# Загружаем настройки пользователя для TTL
user_settings = await db.get_user_settings(message.from_user.id)
ttl_hours = user_settings.get("image_ttl_hours", settings.DEFAULT_IMAGE_TTL_HOURS)
# Если есть профиль, используем его настройки
profile = None
if profile_id:
profile = await db.get_profile(profile_id)
# Отправляем сообщение о начале генерации
status_msg = await message.answer("⏳ <b>Генерация изображения...</b>\n\nЭто может занять некоторое время.", parse_mode="HTML")
# Вызываем API
result = await sd_client.txt2img(
prompt=prompt,
negative_prompt=negative_prompt or (profile["negative_prompt"] if profile else ""),
width=profile["width"] if profile else 512,
height=profile["height"] if profile else 512,
steps=profile["steps"] if profile else 20,
cfg_scale=profile["cfg_scale"] if profile else 7.0,
sampler=profile["sampler"] if profile else "Euler a",
scheduler=profile.get("scheduler", "automatic") if profile else "automatic",
model=profile.get("model") if profile else None,
lora=profile.get("lora") if profile and profile.get("lora") else None,
lora_strength=profile.get("lora_strength", 0.8) if profile else 0.8,
)
if result is None:
await status_msg.edit_text("❌ <b>Ошибка генерации.</b>\n\nПроверьте соединение с SD API и попробуйте снова.")
await state.clear()
return
image_bytes, info = result
# Сохраняем изображение
filename = f"{uuid.uuid4().hex[:12]}.png"
file_path = get_image_path(message.from_user.id, filename)
try:
with open(file_path, "wb") as f:
f.write(image_bytes)
except Exception as e:
logger.error(f"Ошибка сохранения изображения: {e}")
await status_msg.edit_text("❌ <b>Ошибка сохранения изображения.</b>")
await state.clear()
return
# Сохраняем в БД
await db.add_generated_image(
user_id=message.from_user.id,
file_path=file_path,
prompt=prompt,
ttl_hours=ttl_hours,
)
# Увеличиваем счётчик генераций
await db.increment_generation_count(message.from_user.id)
# Отправляем изображение
info_text = (
f"✅ <b>Изображение сгенерировано!</b>\n\n"
f"<b>Промпт:</b> <code>{escape(info['prompt'][:500])}</code>\n"
f"<b>Размер:</b> {info['width']}x{info['height']}\n"
f"<b>Шагов:</b> {info['steps']}\n"
f"<b>CFG Scale:</b> {info['cfg_scale']}\n"
f"<b>Сэмплер:</b> {info['sampler']}\n"
f"<b>Шедулер:</b> {info.get('scheduler', 'automatic')}\n"
f"<b>Seed:</b> <code>{info['seed']}</code>\n"
f"<b>Модель:</b> {info['model']}\n"
f"<b>Время хранения:</b> {ttl_hours} ч."
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🔄 Сгенерировать ещё", callback_data="gen_txt2img")],
[InlineKeyboardButton(text="📋 Главное меню", callback_data="main_menu")],
])
await message.answer_photo(
photo=FSInputFile(file_path),
caption=info_text,
parse_mode="HTML",
reply_markup=keyboard,
)
await status_msg.delete()
await state.clear()