import os import re import json import time import uuid import socket import threading import webbrowser import contextlib import traceback import uvicorn from typing import List from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel from bs4 import BeautifulSoup from deep_translator import GoogleTranslator import httpx import bleach import markdown as md_lib try: import trafilatura except Exception: # библиотека опциональна — без неё импорт по URL вернёт ошибку trafilatura = None # ========================================== # КОНФИГУРАЦИЯ И УТИЛИТЫ # ========================================== PORT = 8142 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) CONFIG_FILE = os.path.join(BASE_DIR, "config.json") LANG_DIR = os.path.join(BASE_DIR, "lang") # Сколько хранить отданный контент (сек). Readeck забирает ссылку почти сразу, # но даём запас. Записи старше TTL удаляются, чтобы CONTENT_STORE не рос вечно. CONTENT_TTL = 3600 # Лимит Google Translate на один запрос ~5000 символов. Берём с запасом. TRANSLATE_CHAR_LIMIT = 4500 # content_id -> {"html": str, "created": float} CONTENT_STORE = {} _store_lock = threading.Lock() # Разрешённые при санитизации теги/атрибуты (статейная разметка). ALLOWED_TAGS = list(bleach.sanitizer.ALLOWED_TAGS) + [ "p", "div", "span", "br", "hr", "pre", "h1", "h2", "h3", "h4", "h5", "h6", "img", "figure", "figcaption", "table", "thead", "tbody", "tfoot", "tr", "th", "td", "article", "section", "blockquote", "sub", "sup", "u", "s", ] ALLOWED_ATTRS = { "*": ["class", "id", "title", "lang"], "a": ["href", "title", "rel", "target"], "img": ["src", "alt", "title", "width", "height"], } def get_lan_ip() -> str: try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) return s.getsockname()[0] except Exception: return "127.0.0.1" def store_content(html: str) -> str: """Сохраняет HTML под новым UUID, попутно подчищая протухшие записи.""" content_id = str(uuid.uuid4()) now = time.time() with _store_lock: expired = [k for k, v in CONTENT_STORE.items() if now - v["created"] > CONTENT_TTL] for k in expired: CONTENT_STORE.pop(k, None) CONTENT_STORE[content_id] = {"html": html, "created": now} return content_id def sanitize_html(html: str) -> str: """Чистит HTML от потенциально опасных тегов/атрибутов перед публикацией.""" return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS, strip=True) def markdown_to_html(text: str) -> str: """Конвертирует Markdown в HTML (с таблицами и блоками кода).""" return md_lib.markdown(text, extensions=["extra", "sane_lists", "nl2br"]) def chunk_text(text: str, limit: int = TRANSLATE_CHAR_LIMIT) -> List[str]: """Режет длинный текст на куски <= limit символов, по возможности по границам предложений/слов, чтобы не упереться в лимит Google на один запрос.""" if len(text) <= limit: return [text] parts = re.split(r"(?<=[.!?。…\n])\s+", text) chunks, buf = [], "" for part in parts: # Одно «предложение» само длиннее лимита — режем жёстко по символам. while len(part) > limit: if buf: chunks.append(buf) buf = "" chunks.append(part[:limit]) part = part[limit:] if len(buf) + len(part) + 1 <= limit: buf = f"{buf} {part}".strip() else: if buf: chunks.append(buf) buf = part if buf: chunks.append(buf) return chunks def translate_long(text: str, target_lang: str) -> str: """Переводит произвольно длинный текст, разбивая его на куски под лимит Google.""" if not text or not text.strip(): return text translator = GoogleTranslator(source="auto", target=target_lang) out = [] for chunk in chunk_text(text): try: res = translator.translate(chunk) out.append(res if res else chunk) except Exception: out.append(chunk) return " ".join(out) def extract_metadata_from_html(html: str) -> dict: """Достаёт title/author/description/site_name/date из HTML-метатегов.""" soup = BeautifulSoup(html, "html.parser") def meta(*, name=None, prop=None): if name: tag = soup.find("meta", attrs={"name": name}) else: tag = soup.find("meta", attrs={"property": prop}) return tag.get("content", "").strip() if tag and tag.get("content") else "" title = "" if soup.title and soup.title.string: title = soup.title.string.strip() title = meta(prop="og:title") or title return { "title": title, "authors": meta(name="author") or meta(prop="article:author"), "description": meta(name="description") or meta(prop="og:description"), "site_name": meta(prop="og:site_name"), "date": meta(prop="article:published_time") or meta(name="date"), } def get_available_languages() -> list: """Сканирует папку lang и возвращает список доступных языков.""" if not os.path.exists(LANG_DIR): return [] languages = [] try: for filename in os.listdir(LANG_DIR): if filename.endswith('.json'): lang_code = filename[:-5] # убираем .json filepath = os.path.join(LANG_DIR, filename) try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) languages.append({ 'code': lang_code, 'name': data.get('lang_name', lang_code), 'native_name': data.get('lang_name', lang_code) }) except Exception as e: print(f"[WARNING] Не удалось загрузить {filename}: {e}") except Exception as e: print(f"[WARNING] Ошибка при сканировании папки lang: {e}") return languages def load_language(lang_code: str) -> dict: """Загружает файл локализации для указанного языка.""" filepath = os.path.join(LANG_DIR, f"{lang_code}.json") if not os.path.exists(filepath): return {} try: with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"[WARNING] Не удалось загрузить локализацию {lang_code}: {e}") return {} def load_config() -> dict: default_config = { "readeck_url": "", "readeck_token": "", "public_host": get_lan_ip(), "language": "ru" } if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): default_config.update(data) except json.JSONDecodeError as e: print(f"\n[WARNING] ОШИБКА В config.json! Файл содержит неверный формат JSON: {e}") print("[WARNING] Проверьте, нет ли там лишних запятых или пропущенных кавычек.\n") except Exception as e: print(f"\n[WARNING] Не удалось прочитать config.json: {e}\n") else: print(f"\n[INFO] Файл {CONFIG_FILE} не найден. Используются пустые настройки.\n") return default_config def save_config(config: dict): try: with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(config, f, indent=4) except Exception as e: print(f"\n[ERROR] Не удалось сохранить config.json: {e}\n") raise HTTPException(status_code=500, detail=f"Ошибка записи в файл настроек: {e}") # ========================================== # ИНИЦИАЛИЗАЦИЯ ПРИЛОЖЕНИЯ # ========================================== @contextlib.asynccontextmanager async def lifespan(app: FastAPI): def open_browser(): webbrowser.open(f"http://127.0.0.1:{PORT}") threading.Timer(1.5, open_browser).start() yield app = FastAPI(title="Readeck Local Importer", lifespan=lifespan) # ========================================== # МОДЕЛИ ДАННЫХ # ========================================== class SettingsModel(BaseModel): readeck_url: str = "" readeck_token: str = "" public_host: str = "" language: str = "ru" class TranslateRequest(BaseModel): content: str target_lang: str = "ru" class FetchUrlRequest(BaseModel): url: str class ExtractMetaRequest(BaseModel): content: str class MarkdownRequest(BaseModel): content: str class SubmitRequest(BaseModel): content: str title: str = "" description: str = "" authors: str = "" site_name: str = "" date: str = "" language: str = "ru" tags: List[str] = [] favorite: bool = False archive: bool = False content_format: str = "html" # html | markdown | text # ========================================== # ФРОНТЕНД (HTML / JS) # ========================================== HTML_TEMPLATE = """
{{ t('app_subtitle') }}
{p}
" for p in paragraphs) or f"{escaped}
" else: html = raw return sanitize_html(html) def inject_metadata(html_content: str, meta: dict) -> str: soup = BeautifulSoup(html_content, "html.parser") if not soup.html: wrapper = BeautifulSoup("", "html.parser") wrapper.body.append(soup) soup = wrapper elif not soup.head: head = soup.new_tag("head") soup.html.insert(0, head) soup.html["lang"] = meta.get("language", "ru") def set_meta(attrs: dict): search_attrs = {k: v for k, v in attrs.items() if k != "content"} tag = soup.head.find("meta", attrs=search_attrs) if tag: tag["content"] = attrs["content"] else: new_tag = soup.new_tag("meta") new_tag.attrs.update(attrs) soup.head.append(new_tag) if meta.get("title"): if soup.head.title: soup.head.title.string = meta["title"] else: t_tag = soup.new_tag("title") t_tag.string = meta["title"] soup.head.append(t_tag) if meta.get("description"): set_meta({"name": "description", "content": meta["description"]}) if meta.get("authors"): set_meta({"name": "author", "content": meta["authors"]}) if meta.get("site_name"): set_meta({"property": "og:site_name", "content": meta["site_name"]}) if meta.get("date"): set_meta({"name": "article:published_time", "content": meta["date"]}) return str(soup) @app.post("/api/submit") async def submit_bookmark(req: SubmitRequest): config = load_config() readeck_url = config.get("readeck_url", "").strip("/") readeck_token = config.get("readeck_token", "") public_host = config.get("public_host", "").strip() or get_lan_ip() if not readeck_url or not readeck_token: raise HTTPException(400, "URL и Токен Readeck не настроены. Откройте настройки и сохраните их.") prepared = prepare_content(req.content, req.content_format) final_html = inject_metadata(prepared, { "title": req.title, "description": req.description, "authors": req.authors, "site_name": req.site_name, "date": req.date, "language": req.language }) content_id = store_content(final_html) callback_url = f"http://{public_host}:{PORT}/content/{content_id}" payload = { "url": callback_url, "labels": req.tags, "favorite": req.favorite, "archived": req.archive } headers = { "Authorization": f"Bearer {readeck_token}", "Content-Type": "application/json" } print(f"\n[DEBUG] --- НАЧАЛО ОТПРАВКИ ---") print(f"[DEBUG] URL: {readeck_url}/api/bookmarks") async with httpx.AsyncClient() as client: try: resp = await client.post( f"{readeck_url}/api/bookmarks", json=payload, headers=headers, timeout=45.0, follow_redirects=True ) print(f"[DEBUG] Статус ответа: {resp.status_code}") if resp.status_code >= 400: raise Exception(f"Readeck отклонил запрос (Код {resp.status_code}). Ответ: {resp.text}") try: data = resp.json() except Exception: data = {"id": "Успешно, но сервер не вернул JSON"} print(f"[DEBUG] --- УСПЕШНО --- \n") return {"success": True, "bookmark": data} except Exception as e: print("\n!!! ОШИБКА READECK API !!!") traceback.print_exc() print("!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") raise HTTPException(500, detail=f"{type(e).__name__}: {str(e)}") class PreviewRequest(BaseModel): content: str title: str = "" description: str = "" authors: str = "" site_name: str = "" date: str = "" language: str = "ru" content_format: str = "html" @app.post("/api/preview") def preview(req: PreviewRequest): """Готовит финальный HTML (как при отправке) и кладёт во временное хранилище для просмотра через /content/{id} — точно так же, как его увидит Readeck.""" prepared = prepare_content(req.content, req.content_format) final_html = inject_metadata(prepared, { "title": req.title, "description": req.description, "authors": req.authors, "site_name": req.site_name, "date": req.date, "language": req.language, }) content_id = store_content(final_html) return {"id": content_id, "url": f"/content/{content_id}"} @app.get("/content/{content_id}") def get_content(content_id: str): entry = CONTENT_STORE.get(content_id) if not entry or time.time() - entry["created"] > CONTENT_TTL: CONTENT_STORE.pop(content_id, None) raise HTTPException(404, "Content not found or expired") return HTMLResponse(content=entry["html"], media_type="text/html; charset=utf-8") if __name__ == "__main__": print(f"[*] Starting Readeck Local Importer on http://0.0.0.0:{PORT}") print(f"[*] Your LAN IP (for firewall/callback) is: {get_lan_ip()}") uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info", reload=False)