"""Обработчики генерации изображений (txt2img).""" import logging import os import uuid from datetime import datetime from html import escape from aiogram import Router, F from aiogram.types import ( Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, FSInputFile, ) from aiogram.fsm.context import FSMContext from aiogram.fsm.state import State, StatesGroup from config import settings from database.database import db from sd.sd_client import sd_client from utils.image_manager import get_image_path router = Router() logger = logging.getLogger(__name__) class Txt2ImgState(StatesGroup): """Состояния для процесса txt2img.""" waiting_for_prompt = State() waiting_for_negative_prompt = State() generating = State() def get_generation_keyboard() -> InlineKeyboardMarkup: """Клавиатура для генерации txt2img.""" return InlineKeyboardMarkup(inline_keyboard=[ [ InlineKeyboardButton(text="📝 Ввести промпт", callback_data="txt2img_enter_prompt"), ], [ InlineKeyboardButton(text="📋 Использовать профиль", callback_data="txt2img_use_profile"), ], [ InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu"), ], ]) @router.callback_query(F.data == "gen_txt2img") async def start_txt2img(callback: CallbackQuery, state: FSMContext): """Начать процесс txt2img.""" await state.clear() await callback.message.answer( "🎨 Генерация изображения (txt2img)\n\n" "Введите описание изображения (промпт).\n" "Можете использовать /cancel для отмены.", parse_mode="HTML", reply_markup=get_generation_keyboard(), ) await callback.answer() @router.callback_query(F.data == "txt2img_enter_prompt") async def enter_prompt_txt(callback: CallbackQuery, state: FSMContext): """Ввод промпта для txt2img.""" await state.set_state(Txt2ImgState.waiting_for_prompt) await callback.message.edit_text( "📝 Введите промпт:\n\n" "Опишите изображение, которое хотите сгенерировать.\n" "Используйте /cancel для отмены.", parse_mode="HTML", ) await callback.answer() @router.callback_query(F.data == "txt2img_use_profile") async def use_profile_txt(callback: CallbackQuery, state: FSMContext): """Использовать профиль для txt2img.""" profiles = await db.get_user_profiles(callback.from_user.id) if not profiles: await callback.answer("У вас пока нет профилей. Создайте профиль в разделе 'Профили'.", show_alert=True) return keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text=p["name"], callback_data=f"txt2img_profile_{p['id']}")] for p in profiles ]) keyboard.inline_keyboard.append([InlineKeyboardButton(text="⬅️ Назад", callback_data="gen_txt2img")]) await state.set_state(Txt2ImgState.waiting_for_prompt) await callback.message.edit_text( "📋 Выберите профиль:\n\n" "После выбора будет запрошен промпт.", parse_mode="HTML", reply_markup=keyboard, ) await callback.answer() @router.callback_query(F.data.startswith("txt2img_profile_")) async def select_profile_txt(callback: CallbackQuery, state: FSMContext): """Выбор профиля для txt2img.""" profile_id = int(callback.data.split("_")[-1]) await state.update_data(profile_id=profile_id) profile = await db.get_profile(profile_id) await state.set_state(Txt2ImgState.waiting_for_prompt) await callback.message.edit_text( f"✅ Профиль {profile['name']} выбран.\n\n" f"Параметры: {profile['width']}x{profile['height']}, " f"шагов: {profile['steps']}, CFG: {profile['cfg_scale']}, " f"сэмплер: {profile['sampler']}, шедулер: {profile.get('scheduler', 'automatic')}\n\n" f"📝 Введите промпт:\n" f"Используйте /cancel для отмены.", parse_mode="HTML", ) await callback.answer() @router.message(Txt2ImgState.waiting_for_prompt, F.text) async def process_prompt_txt(message: Message, state: FSMContext): """Обработка введённого промпта для txt2img.""" prompt = message.text.strip() if not prompt: await message.answer("Промпт не может быть пустым. Введите описание изображения.") return await state.update_data(prompt=prompt) # Запрашиваем негативный промпт await state.set_state(Txt2ImgState.waiting_for_negative_prompt) await message.answer( "🚫 Введите негативный промпт (необязательно)\n\n" "Опишите, что НЕ должно быть на изображении.\n" "Отправьте - чтобы пропустить.\n" "Используйте /cancel для отмены.", parse_mode="HTML", ) @router.message(Txt2ImgState.waiting_for_negative_prompt) async def process_negative_prompt_txt(message: Message, state: FSMContext): """Обработка негативного промпта для txt2img.""" negative_prompt = message.text.strip() if negative_prompt == "-": negative_prompt = "" await state.update_data(negative_prompt=negative_prompt) await state.set_state(Txt2ImgState.generating) # Получаем данные data = await state.get_data() prompt = data.get("prompt", "") profile_id = data.get("profile_id") # Загружаем настройки пользователя для TTL user_settings = await db.get_user_settings(message.from_user.id) ttl_hours = user_settings.get("image_ttl_hours", settings.DEFAULT_IMAGE_TTL_HOURS) # Если есть профиль, используем его настройки profile = None if profile_id: profile = await db.get_profile(profile_id) # Отправляем сообщение о начале генерации status_msg = await message.answer("⏳ Генерация изображения...\n\nЭто может занять некоторое время.", parse_mode="HTML") # Вызываем API result = await sd_client.txt2img( prompt=prompt, negative_prompt=negative_prompt or (profile["negative_prompt"] if profile else ""), width=profile["width"] if profile else 512, height=profile["height"] if profile else 512, steps=profile["steps"] if profile else 20, cfg_scale=profile["cfg_scale"] if profile else 7.0, sampler=profile["sampler"] if profile else "Euler a", scheduler=profile.get("scheduler", "automatic") if profile else "automatic", model=profile.get("model") if profile else None, lora=profile.get("lora") if profile and profile.get("lora") else None, lora_strength=profile.get("lora_strength", 0.8) if profile else 0.8, ) if result is None: await status_msg.edit_text("❌ Ошибка генерации.\n\nПроверьте соединение с SD API и попробуйте снова.") await state.clear() return image_bytes, info = result # Сохраняем изображение filename = f"{uuid.uuid4().hex[:12]}.png" file_path = get_image_path(message.from_user.id, filename) try: with open(file_path, "wb") as f: f.write(image_bytes) except Exception as e: logger.error(f"Ошибка сохранения изображения: {e}") await status_msg.edit_text("❌ Ошибка сохранения изображения.") await state.clear() return # Сохраняем в БД await db.add_generated_image( user_id=message.from_user.id, file_path=file_path, prompt=prompt, ttl_hours=ttl_hours, ) # Увеличиваем счётчик генераций await db.increment_generation_count(message.from_user.id) # Отправляем изображение info_text = ( f"✅ Изображение сгенерировано!\n\n" f"Промпт: {escape(info['prompt'][:500])}\n" f"Размер: {info['width']}x{info['height']}\n" f"Шагов: {info['steps']}\n" f"CFG Scale: {info['cfg_scale']}\n" f"Сэмплер: {info['sampler']}\n" f"Шедулер: {info.get('scheduler', 'automatic')}\n" f"Seed: {info['seed']}\n" f"Модель: {info['model']}\n" f"Время хранения: {ttl_hours} ч." ) keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="🔄 Сгенерировать ещё", callback_data="gen_txt2img")], [InlineKeyboardButton(text="📋 Главное меню", callback_data="main_menu")], ]) await message.answer_photo( photo=FSInputFile(file_path), caption=info_text, parse_mode="HTML", reply_markup=keyboard, ) await status_msg.delete() await state.clear()