b553c957f3
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
231 lines
10 KiB
Python
231 lines
10 KiB
Python
# database/db_manager.py
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
from contextlib import contextmanager
|
|
|
|
from utils.logger import logger
|
|
from database.schema import PRAGMA_FOREIGN_KEYS, CREATE_TABLES_SQL, CREATE_INDEXES_SQL
|
|
|
|
class DBManager:
|
|
def __init__(self, db_path: str = "comfygallery.db"):
|
|
self.db_path = db_path
|
|
self._initialize_db()
|
|
|
|
@contextmanager
|
|
def _get_connection(self):
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.execute(PRAGMA_FOREIGN_KEYS)
|
|
conn.row_factory = sqlite3.Row
|
|
yield conn
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка соединения с БД {self.db_path}: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def _initialize_db(self) -> bool:
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.executescript(CREATE_TABLES_SQL)
|
|
cursor.executescript(CREATE_INDEXES_SQL)
|
|
conn.commit()
|
|
return True
|
|
except sqlite3.Error as e:
|
|
logger.critical(f"Критическая ошибка инициализации БД: {e}")
|
|
return False
|
|
|
|
def add_folder(self, folder_path: str, parent_id: Optional[int] = None) -> Optional[int]:
|
|
if not folder_path:
|
|
return None
|
|
normalized_path = str(Path(folder_path).resolve().as_posix())
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO folders (path, parent_id) VALUES (?, ?) "
|
|
"ON CONFLICT(path) DO UPDATE SET parent_id=excluded.parent_id RETURNING id",
|
|
(normalized_path, parent_id)
|
|
)
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.execute("SELECT id FROM folders WHERE path = ?", (normalized_path,))
|
|
result = cursor.fetchone()
|
|
conn.commit()
|
|
return result[0] if result else None
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка при добавлении папки {normalized_path}: {e}")
|
|
return None
|
|
|
|
def remove_folder_by_path(self, folder_path: str) -> bool:
|
|
normalized_path = str(Path(folder_path).resolve().as_posix())
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM folders WHERE path = ?", (normalized_path,))
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка при удалении папки {normalized_path} из БД: {e}")
|
|
return False
|
|
|
|
def register_file(self, folder_id: int, filename: str, filepath: str, size: int, mtime: float) -> Optional[int]:
|
|
if not filename or not filepath:
|
|
return None
|
|
normalized_filepath = str(Path(filepath).resolve().as_posix())
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO files (folder_id, filename, filepath, size, mtime)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(filepath) DO UPDATE SET
|
|
size = excluded.size,
|
|
mtime = excluded.mtime,
|
|
folder_id = excluded.folder_id
|
|
RETURNING id
|
|
""",
|
|
(folder_id, filename, normalized_filepath, size, mtime)
|
|
)
|
|
result = cursor.fetchone()
|
|
conn.commit()
|
|
return result[0] if result else None
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка регистрации файла {normalized_filepath}: {e}")
|
|
return None
|
|
|
|
def remove_file_by_path(self, filepath: str) -> bool:
|
|
normalized_filepath = str(Path(filepath).resolve().as_posix())
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM files WHERE filepath = ?", (normalized_filepath,))
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка удаления файла {normalized_filepath} из БД: {e}")
|
|
return False
|
|
|
|
def save_metadata(self, file_id: int, meta_payload: Dict[str, Any]) -> bool:
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (
|
|
file_id, prompt_json, workflow_json, positive_prompt,
|
|
negative_prompt, seed, model_name, sampler, steps, cfg
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(file_id) DO UPDATE SET
|
|
prompt_json = excluded.prompt_json,
|
|
workflow_json = excluded.workflow_json,
|
|
positive_prompt = excluded.positive_prompt,
|
|
negative_prompt = excluded.negative_prompt,
|
|
seed = excluded.seed,
|
|
model_name = excluded.model_name,
|
|
sampler = excluded.sampler,
|
|
steps = excluded.steps,
|
|
cfg = excluded.cfg
|
|
""",
|
|
(
|
|
file_id,
|
|
meta_payload.get("prompt_json"),
|
|
meta_payload.get("workflow_json"),
|
|
meta_payload.get("positive_prompt"),
|
|
meta_payload.get("negative_prompt"),
|
|
meta_payload.get("seed"),
|
|
meta_payload.get("model_name"),
|
|
meta_payload.get("sampler"),
|
|
meta_payload.get("steps"),
|
|
meta_payload.get("cfg")
|
|
)
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка при сохранении метаданных файла ID {file_id}: {e}")
|
|
return False
|
|
|
|
def get_files_in_folder(self, folder_path: str, search_query: str = "") -> List[Dict[str, Any]]:
|
|
normalized_path = str(Path(folder_path).resolve().as_posix())
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if search_query:
|
|
like_query = f"%{search_query}%"
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.id, f.filename, f.filepath, f.rating, f.favorite,
|
|
CASE WHEN m.workflow_json IS NOT NULL AND m.workflow_json != '' THEN 1 ELSE 0 END as has_workflow
|
|
FROM files f
|
|
JOIN folders fo ON f.folder_id = fo.id
|
|
LEFT JOIN metadata m ON f.id = m.file_id
|
|
WHERE fo.path = ? AND (f.filename LIKE ? OR m.positive_prompt LIKE ? OR m.negative_prompt LIKE ?)
|
|
ORDER BY f.filename ASC
|
|
""",
|
|
(normalized_path, like_query, like_query, like_query)
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.id, f.filename, f.filepath, f.rating, f.favorite,
|
|
CASE WHEN m.workflow_json IS NOT NULL AND m.workflow_json != '' THEN 1 ELSE 0 END as has_workflow
|
|
FROM files f
|
|
JOIN folders fo ON f.folder_id = fo.id
|
|
LEFT JOIN metadata m ON f.id = m.file_id
|
|
WHERE fo.path = ?
|
|
ORDER BY f.filename ASC
|
|
""",
|
|
(normalized_path,)
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка получения файлов в папке {normalized_path}: {e}")
|
|
return []
|
|
|
|
def get_file_details(self, file_id: int) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.id, f.filename, f.filepath, f.rating, f.favorite,
|
|
m.prompt_json, m.workflow_json, m.positive_prompt, m.negative_prompt,
|
|
m.seed, m.model_name, m.sampler, m.steps, m.cfg
|
|
FROM files f
|
|
LEFT JOIN metadata m ON f.id = m.file_id
|
|
WHERE f.id = ?
|
|
""",
|
|
(file_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка получения деталей файла ID {file_id}: {e}")
|
|
return None
|
|
|
|
def update_file_details(self, file_id: int, positive: str, negative: str, rating: int) -> bool:
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE files SET rating = ? WHERE id = ?", (rating, file_id))
|
|
cursor.execute(
|
|
"""
|
|
UPDATE metadata
|
|
SET positive_prompt = ?, negative_prompt = ?
|
|
WHERE file_id = ?
|
|
""",
|
|
(positive, negative, file_id)
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Ошибка при сохранении правок для файла ID {file_id}: {e}")
|
|
return False |