b88ccf3b4b
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
242 lines
9.6 KiB
Python
242 lines
9.6 KiB
Python
"""Обработчики генерации изображений (txt2img)."""
|
|
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from html import escape
|
|
from aiogram import Router, F
|
|
from aiogram.types import (
|
|
Message,
|
|
CallbackQuery,
|
|
InlineKeyboardMarkup,
|
|
InlineKeyboardButton,
|
|
FSInputFile,
|
|
)
|
|
from aiogram.fsm.context import FSMContext
|
|
from aiogram.fsm.state import State, StatesGroup
|
|
|
|
from config import settings
|
|
from database.database import db
|
|
from sd.sd_client import sd_client
|
|
from utils.image_manager import get_image_path
|
|
|
|
router = Router()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Txt2ImgState(StatesGroup):
|
|
"""Состояния для процесса txt2img."""
|
|
waiting_for_prompt = State()
|
|
waiting_for_negative_prompt = State()
|
|
generating = State()
|
|
|
|
|
|
def get_generation_keyboard() -> InlineKeyboardMarkup:
|
|
"""Клавиатура для генерации txt2img."""
|
|
return InlineKeyboardMarkup(inline_keyboard=[
|
|
[
|
|
InlineKeyboardButton(text="📝 Ввести промпт", callback_data="txt2img_enter_prompt"),
|
|
],
|
|
[
|
|
InlineKeyboardButton(text="📋 Использовать профиль", callback_data="txt2img_use_profile"),
|
|
],
|
|
[
|
|
InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu"),
|
|
],
|
|
])
|
|
|
|
|
|
@router.callback_query(F.data == "gen_txt2img")
|
|
async def start_txt2img(callback: CallbackQuery, state: FSMContext):
|
|
"""Начать процесс txt2img."""
|
|
await state.clear()
|
|
await callback.message.answer(
|
|
"🎨 <b>Генерация изображения (txt2img)</b>\n\n"
|
|
"Введите описание изображения (промпт).\n"
|
|
"Можете использовать <code>/cancel</code> для отмены.",
|
|
parse_mode="HTML",
|
|
reply_markup=get_generation_keyboard(),
|
|
)
|
|
await callback.answer()
|
|
|
|
|
|
@router.callback_query(F.data == "txt2img_enter_prompt")
|
|
async def enter_prompt_txt(callback: CallbackQuery, state: FSMContext):
|
|
"""Ввод промпта для txt2img."""
|
|
await state.set_state(Txt2ImgState.waiting_for_prompt)
|
|
await callback.message.edit_text(
|
|
"📝 <b>Введите промпт:</b>\n\n"
|
|
"Опишите изображение, которое хотите сгенерировать.\n"
|
|
"Используйте <code>/cancel</code> для отмены.",
|
|
parse_mode="HTML",
|
|
)
|
|
await callback.answer()
|
|
|
|
|
|
@router.callback_query(F.data == "txt2img_use_profile")
|
|
async def use_profile_txt(callback: CallbackQuery, state: FSMContext):
|
|
"""Использовать профиль для txt2img."""
|
|
profiles = await db.get_user_profiles(callback.from_user.id)
|
|
if not profiles:
|
|
await callback.answer("У вас пока нет профилей. Создайте профиль в разделе 'Профили'.", show_alert=True)
|
|
return
|
|
|
|
keyboard = InlineKeyboardMarkup(inline_keyboard=[
|
|
[InlineKeyboardButton(text=p["name"], callback_data=f"txt2img_profile_{p['id']}")]
|
|
for p in profiles
|
|
])
|
|
keyboard.inline_keyboard.append([InlineKeyboardButton(text="⬅️ Назад", callback_data="gen_txt2img")])
|
|
|
|
await state.set_state(Txt2ImgState.waiting_for_prompt)
|
|
await callback.message.edit_text(
|
|
"📋 <b>Выберите профиль:</b>\n\n"
|
|
"После выбора будет запрошен промпт.",
|
|
parse_mode="HTML",
|
|
reply_markup=keyboard,
|
|
)
|
|
await callback.answer()
|
|
|
|
|
|
@router.callback_query(F.data.startswith("txt2img_profile_"))
|
|
async def select_profile_txt(callback: CallbackQuery, state: FSMContext):
|
|
"""Выбор профиля для txt2img."""
|
|
profile_id = int(callback.data.split("_")[-1])
|
|
await state.update_data(profile_id=profile_id)
|
|
profile = await db.get_profile(profile_id)
|
|
|
|
await state.set_state(Txt2ImgState.waiting_for_prompt)
|
|
await callback.message.edit_text(
|
|
f"✅ Профиль <b>{profile['name']}</b> выбран.\n\n"
|
|
f"Параметры: {profile['width']}x{profile['height']}, "
|
|
f"шагов: {profile['steps']}, CFG: {profile['cfg_scale']}, "
|
|
f"сэмплер: {profile['sampler']}, шедулер: {profile.get('scheduler', 'automatic')}\n\n"
|
|
f"📝 <b>Введите промпт:</b>\n"
|
|
f"Используйте <code>/cancel</code> для отмены.",
|
|
parse_mode="HTML",
|
|
)
|
|
await callback.answer()
|
|
|
|
|
|
@router.message(Txt2ImgState.waiting_for_prompt, F.text)
|
|
async def process_prompt_txt(message: Message, state: FSMContext):
|
|
"""Обработка введённого промпта для txt2img."""
|
|
prompt = message.text.strip()
|
|
if not prompt:
|
|
await message.answer("Промпт не может быть пустым. Введите описание изображения.")
|
|
return
|
|
|
|
await state.update_data(prompt=prompt)
|
|
|
|
# Запрашиваем негативный промпт
|
|
await state.set_state(Txt2ImgState.waiting_for_negative_prompt)
|
|
await message.answer(
|
|
"🚫 <b>Введите негативный промпт</b> (необязательно)\n\n"
|
|
"Опишите, что НЕ должно быть на изображении.\n"
|
|
"Отправьте <code>-</code> чтобы пропустить.\n"
|
|
"Используйте <code>/cancel</code> для отмены.",
|
|
parse_mode="HTML",
|
|
)
|
|
|
|
|
|
@router.message(Txt2ImgState.waiting_for_negative_prompt)
|
|
async def process_negative_prompt_txt(message: Message, state: FSMContext):
|
|
"""Обработка негативного промпта для txt2img."""
|
|
negative_prompt = message.text.strip()
|
|
if negative_prompt == "-":
|
|
negative_prompt = ""
|
|
|
|
await state.update_data(negative_prompt=negative_prompt)
|
|
await state.set_state(Txt2ImgState.generating)
|
|
|
|
# Получаем данные
|
|
data = await state.get_data()
|
|
prompt = data.get("prompt", "")
|
|
profile_id = data.get("profile_id")
|
|
|
|
# Загружаем настройки пользователя для TTL
|
|
user_settings = await db.get_user_settings(message.from_user.id)
|
|
ttl_hours = user_settings.get("image_ttl_hours", settings.DEFAULT_IMAGE_TTL_HOURS)
|
|
|
|
# Если есть профиль, используем его настройки
|
|
profile = None
|
|
if profile_id:
|
|
profile = await db.get_profile(profile_id)
|
|
|
|
# Отправляем сообщение о начале генерации
|
|
status_msg = await message.answer("⏳ <b>Генерация изображения...</b>\n\nЭто может занять некоторое время.", parse_mode="HTML")
|
|
|
|
# Вызываем API
|
|
result = await sd_client.txt2img(
|
|
prompt=prompt,
|
|
negative_prompt=negative_prompt or (profile["negative_prompt"] if profile else ""),
|
|
width=profile["width"] if profile else 512,
|
|
height=profile["height"] if profile else 512,
|
|
steps=profile["steps"] if profile else 20,
|
|
cfg_scale=profile["cfg_scale"] if profile else 7.0,
|
|
sampler=profile["sampler"] if profile else "Euler a",
|
|
scheduler=profile.get("scheduler", "automatic") if profile else "automatic",
|
|
model=profile.get("model") if profile else None,
|
|
lora=profile.get("lora") if profile and profile.get("lora") else None,
|
|
lora_strength=profile.get("lora_strength", 0.8) if profile else 0.8,
|
|
)
|
|
|
|
if result is None:
|
|
await status_msg.edit_text("❌ <b>Ошибка генерации.</b>\n\nПроверьте соединение с SD API и попробуйте снова.")
|
|
await state.clear()
|
|
return
|
|
|
|
image_bytes, info = result
|
|
|
|
# Сохраняем изображение
|
|
filename = f"{uuid.uuid4().hex[:12]}.png"
|
|
file_path = get_image_path(message.from_user.id, filename)
|
|
|
|
try:
|
|
with open(file_path, "wb") as f:
|
|
f.write(image_bytes)
|
|
except Exception as e:
|
|
logger.error(f"Ошибка сохранения изображения: {e}")
|
|
await status_msg.edit_text("❌ <b>Ошибка сохранения изображения.</b>")
|
|
await state.clear()
|
|
return
|
|
|
|
# Сохраняем в БД
|
|
await db.add_generated_image(
|
|
user_id=message.from_user.id,
|
|
file_path=file_path,
|
|
prompt=prompt,
|
|
ttl_hours=ttl_hours,
|
|
)
|
|
|
|
# Увеличиваем счётчик генераций
|
|
await db.increment_generation_count(message.from_user.id)
|
|
|
|
# Отправляем изображение
|
|
info_text = (
|
|
f"✅ <b>Изображение сгенерировано!</b>\n\n"
|
|
f"<b>Промпт:</b> <code>{escape(info['prompt'][:500])}</code>\n"
|
|
f"<b>Размер:</b> {info['width']}x{info['height']}\n"
|
|
f"<b>Шагов:</b> {info['steps']}\n"
|
|
f"<b>CFG Scale:</b> {info['cfg_scale']}\n"
|
|
f"<b>Сэмплер:</b> {info['sampler']}\n"
|
|
f"<b>Шедулер:</b> {info.get('scheduler', 'automatic')}\n"
|
|
f"<b>Seed:</b> <code>{info['seed']}</code>\n"
|
|
f"<b>Модель:</b> {info['model']}\n"
|
|
f"<b>Время хранения:</b> {ttl_hours} ч."
|
|
)
|
|
|
|
keyboard = InlineKeyboardMarkup(inline_keyboard=[
|
|
[InlineKeyboardButton(text="🔄 Сгенерировать ещё", callback_data="gen_txt2img")],
|
|
[InlineKeyboardButton(text="📋 Главное меню", callback_data="main_menu")],
|
|
])
|
|
|
|
await message.answer_photo(
|
|
photo=FSInputFile(file_path),
|
|
caption=info_text,
|
|
parse_mode="HTML",
|
|
reply_markup=keyboard,
|
|
)
|
|
await status_msg.delete()
|
|
await state.clear()
|