import os import re import json import time import uuid import socket import threading import webbrowser import contextlib import traceback import uvicorn from typing import List from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel from bs4 import BeautifulSoup from deep_translator import GoogleTranslator import httpx import bleach import markdown as md_lib try: import trafilatura except Exception: # библиотека опциональна — без неё импорт по URL вернёт ошибку trafilatura = None # ========================================== # КОНФИГУРАЦИЯ И УТИЛИТЫ # ========================================== PORT = 8142 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) CONFIG_FILE = os.path.join(BASE_DIR, "config.json") LANG_DIR = os.path.join(BASE_DIR, "lang") # Сколько хранить отданный контент (сек). Readeck забирает ссылку почти сразу, # но даём запас. Записи старше TTL удаляются, чтобы CONTENT_STORE не рос вечно. CONTENT_TTL = 3600 # Лимит Google Translate на один запрос ~5000 символов. Берём с запасом. TRANSLATE_CHAR_LIMIT = 4500 # content_id -> {"html": str, "created": float} CONTENT_STORE = {} _store_lock = threading.Lock() # Разрешённые при санитизации теги/атрибуты (статейная разметка). ALLOWED_TAGS = list(bleach.sanitizer.ALLOWED_TAGS) + [ "p", "div", "span", "br", "hr", "pre", "h1", "h2", "h3", "h4", "h5", "h6", "img", "figure", "figcaption", "table", "thead", "tbody", "tfoot", "tr", "th", "td", "article", "section", "blockquote", "sub", "sup", "u", "s", ] ALLOWED_ATTRS = { "*": ["class", "id", "title", "lang"], "a": ["href", "title", "rel", "target"], "img": ["src", "alt", "title", "width", "height"], } def get_lan_ip() -> str: try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) return s.getsockname()[0] except Exception: return "127.0.0.1" def store_content(html: str) -> str: """Сохраняет HTML под новым UUID, попутно подчищая протухшие записи.""" content_id = str(uuid.uuid4()) now = time.time() with _store_lock: expired = [k for k, v in CONTENT_STORE.items() if now - v["created"] > CONTENT_TTL] for k in expired: CONTENT_STORE.pop(k, None) CONTENT_STORE[content_id] = {"html": html, "created": now} return content_id def sanitize_html(html: str) -> str: """Чистит HTML от потенциально опасных тегов/атрибутов перед публикацией.""" return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS, strip=True) def markdown_to_html(text: str) -> str: """Конвертирует Markdown в HTML (с таблицами и блоками кода).""" return md_lib.markdown(text, extensions=["extra", "sane_lists", "nl2br"]) def chunk_text(text: str, limit: int = TRANSLATE_CHAR_LIMIT) -> List[str]: """Режет длинный текст на куски <= limit символов, по возможности по границам предложений/слов, чтобы не упереться в лимит Google на один запрос.""" if len(text) <= limit: return [text] parts = re.split(r"(?<=[.!?。…\n])\s+", text) chunks, buf = [], "" for part in parts: # Одно «предложение» само длиннее лимита — режем жёстко по символам. while len(part) > limit: if buf: chunks.append(buf) buf = "" chunks.append(part[:limit]) part = part[limit:] if len(buf) + len(part) + 1 <= limit: buf = f"{buf} {part}".strip() else: if buf: chunks.append(buf) buf = part if buf: chunks.append(buf) return chunks def translate_long(text: str, target_lang: str) -> str: """Переводит произвольно длинный текст, разбивая его на куски под лимит Google.""" if not text or not text.strip(): return text translator = GoogleTranslator(source="auto", target=target_lang) out = [] for chunk in chunk_text(text): try: res = translator.translate(chunk) out.append(res if res else chunk) except Exception: out.append(chunk) return " ".join(out) def extract_metadata_from_html(html: str) -> dict: """Достаёт title/author/description/site_name/date из HTML-метатегов.""" soup = BeautifulSoup(html, "html.parser") def meta(*, name=None, prop=None): if name: tag = soup.find("meta", attrs={"name": name}) else: tag = soup.find("meta", attrs={"property": prop}) return tag.get("content", "").strip() if tag and tag.get("content") else "" title = "" if soup.title and soup.title.string: title = soup.title.string.strip() title = meta(prop="og:title") or title return { "title": title, "authors": meta(name="author") or meta(prop="article:author"), "description": meta(name="description") or meta(prop="og:description"), "site_name": meta(prop="og:site_name"), "date": meta(prop="article:published_time") or meta(name="date"), } def get_available_languages() -> list: """Сканирует папку lang и возвращает список доступных языков.""" if not os.path.exists(LANG_DIR): return [] languages = [] try: for filename in os.listdir(LANG_DIR): if filename.endswith('.json'): lang_code = filename[:-5] # убираем .json filepath = os.path.join(LANG_DIR, filename) try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) languages.append({ 'code': lang_code, 'name': data.get('lang_name', lang_code), 'native_name': data.get('lang_name', lang_code) }) except Exception as e: print(f"[WARNING] Не удалось загрузить {filename}: {e}") except Exception as e: print(f"[WARNING] Ошибка при сканировании папки lang: {e}") return languages def load_language(lang_code: str) -> dict: """Загружает файл локализации для указанного языка.""" filepath = os.path.join(LANG_DIR, f"{lang_code}.json") if not os.path.exists(filepath): return {} try: with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"[WARNING] Не удалось загрузить локализацию {lang_code}: {e}") return {} def load_config() -> dict: default_config = { "readeck_url": "", "readeck_token": "", "public_host": get_lan_ip(), "language": "ru" } if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): default_config.update(data) except json.JSONDecodeError as e: print(f"\n[WARNING] ОШИБКА В config.json! Файл содержит неверный формат JSON: {e}") print("[WARNING] Проверьте, нет ли там лишних запятых или пропущенных кавычек.\n") except Exception as e: print(f"\n[WARNING] Не удалось прочитать config.json: {e}\n") else: print(f"\n[INFO] Файл {CONFIG_FILE} не найден. Используются пустые настройки.\n") return default_config def save_config(config: dict): try: with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(config, f, indent=4) except Exception as e: print(f"\n[ERROR] Не удалось сохранить config.json: {e}\n") raise HTTPException(status_code=500, detail=f"Ошибка записи в файл настроек: {e}") # ========================================== # ИНИЦИАЛИЗАЦИЯ ПРИЛОЖЕНИЯ # ========================================== @contextlib.asynccontextmanager async def lifespan(app: FastAPI): def open_browser(): webbrowser.open(f"http://127.0.0.1:{PORT}") threading.Timer(1.5, open_browser).start() yield app = FastAPI(title="Readeck Local Importer", lifespan=lifespan) # ========================================== # МОДЕЛИ ДАННЫХ # ========================================== class SettingsModel(BaseModel): readeck_url: str = "" readeck_token: str = "" public_host: str = "" language: str = "ru" class TranslateRequest(BaseModel): content: str target_lang: str = "ru" class FetchUrlRequest(BaseModel): url: str class ExtractMetaRequest(BaseModel): content: str class MarkdownRequest(BaseModel): content: str class SubmitRequest(BaseModel): content: str title: str = "" description: str = "" authors: str = "" site_name: str = "" date: str = "" language: str = "ru" tags: List[str] = [] favorite: bool = False archive: bool = False content_format: str = "html" # html | markdown | text # ========================================== # ФРОНТЕНД (HTML / JS) # ========================================== HTML_TEMPLATE = """ Readeck Local Importer

📚 {{ t('app_title') }}

{{ t('app_subtitle') }}

1

{{ t('section_upload') }}

{{ charCount }} {{ t('content_stats', { words: wordCount }) }}
🌐 {{ t('translation') }}
2

{{ t('section_metadata') }}

3

{{ t('section_readeck') }}

{{ resultMessage }}
⚙️

{{ t('settings_title') }}

{{ testMessage }}
👁️

{{ t('preview_title') }}

{{ t('preview_new_tab') }}
""" # ========================================== # БЭКЕНД: ЭНДПОИНТЫ API # ========================================== @app.get("/", response_class=HTMLResponse) def index(): return HTMLResponse(HTML_TEMPLATE) @app.get("/api/settings") def get_settings(): return load_config() @app.post("/api/settings") def update_settings(settings: SettingsModel): config_dict = settings.model_dump() if hasattr(settings, "model_dump") else settings.dict() save_config(config_dict) return {"status": "ok"} @app.get("/api/languages") def get_languages(): """Возвращает список доступных языков интерфейса.""" return {"languages": get_available_languages()} @app.get("/api/language/{lang_code}") def get_language(lang_code: str): """Возвращает строки локализации для указанного языка.""" translations = load_language(lang_code) if not translations: raise HTTPException(404, f"Язык {lang_code} не найден") return translations @app.post("/api/upload") async def upload_file(file: UploadFile = File(...)): content = await file.read() for enc in ["utf-8", "windows-1251", "latin-1"]: try: text = content.decode(enc) return {"content": text} except UnicodeDecodeError: continue raise HTTPException(400, "Невозможно прочитать кодировку файла.") @app.post("/api/translate") def translate_api(req: TranslateRequest): soup = BeautifulSoup(req.content, "html.parser") # Простой текст без тегов — переводим целиком с учётом лимита Google. if not soup.find(): return {"translated": translate_long(req.content, req.target_lang)} translator = GoogleTranslator(source='auto', target=req.target_lang) nodes_to_translate = [] texts = [] for node in soup.find_all(string=True): if node.parent.name not in ['style', 'script', 'head'] and node.strip(): nodes_to_translate.append(node) texts.append(node.strip()) if not texts: return {"translated": req.content} # Группируем текстовые узлы в батчи так, чтобы суммарная длина батча # не превышала лимит Google на один запрос. Длинные узлы переводим # по отдельности через translate_long (он сам режет на части). translated_texts = [] batch, batch_len = [], 0 def flush_batch(): nonlocal batch, batch_len if not batch: return try: res = translator.translate_batch(batch) translated_texts.extend(res) except Exception: for text in batch: try: translated_texts.append(translator.translate(text)) except Exception: translated_texts.append(text) batch, batch_len = [], 0 for text in texts: if len(text) > TRANSLATE_CHAR_LIMIT: flush_batch() translated_texts.append(translate_long(text, req.target_lang)) continue # +1 на разделитель; держим батч под лимитом и не длиннее 20 узлов. if batch_len + len(text) + 1 > TRANSLATE_CHAR_LIMIT or len(batch) >= 20: flush_batch() batch.append(text) batch_len += len(text) + 1 flush_batch() for i, node in enumerate(nodes_to_translate): if i < len(translated_texts) and translated_texts[i]: node.replace_with(translated_texts[i]) return {"translated": str(soup)} @app.post("/api/fetch-url") async def fetch_url(req: FetchUrlRequest): """Скачивает страницу по URL и извлекает из неё чистый текст статьи + метаданные.""" url = req.url.strip() if not url: raise HTTPException(400, "URL не указан.") if not url.startswith(("http://", "https://")): url = "https://" + url if trafilatura is None: raise HTTPException(500, "Библиотека trafilatura не установлена (pip install trafilatura).") headers = {"User-Agent": "Mozilla/5.0 (compatible; ReadeckImporter/1.0)"} try: async with httpx.AsyncClient(follow_redirects=True, timeout=30.0, headers=headers) as client: resp = await client.get(url) resp.raise_for_status() raw_html = resp.text except Exception as e: raise HTTPException(400, f"Не удалось загрузить страницу: {type(e).__name__}: {e}") # Извлекаем основной контент как HTML (с заголовками/ссылками/картинками). extracted = trafilatura.extract( raw_html, output_format="html", include_links=True, include_images=True, include_formatting=True, url=url, ) content = sanitize_html(extracted) if extracted else "" meta = extract_metadata_from_html(raw_html) if not content: # Фолбэк: если ничего не извлеклось — отдадим хотя бы текст body. soup = BeautifulSoup(raw_html, "html.parser") body = soup.body content = sanitize_html(str(body)) if body else "" return {"content": content, "meta": meta} @app.post("/api/extract-meta") def extract_meta(req: ExtractMetaRequest): """Возвращает метаданные, найденные в переданном HTML.""" return {"meta": extract_metadata_from_html(req.content)} @app.post("/api/render-markdown") def render_markdown(req: MarkdownRequest): """Конвертирует Markdown в HTML для предпросмотра/отправки.""" return {"html": markdown_to_html(req.content)} @app.post("/api/test-connection") async def test_connection(settings: SettingsModel): """Проверяет доступность Readeck и валидность токена.""" readeck_url = settings.readeck_url.strip().strip("/") readeck_token = settings.readeck_token.strip() if not readeck_url or not readeck_token: raise HTTPException(400, "Заполните URL и токен.") headers = {"Authorization": f"Bearer {readeck_token}"} try: async with httpx.AsyncClient(follow_redirects=True, timeout=15.0) as client: resp = await client.get(f"{readeck_url}/api/bookmarks?limit=1", headers=headers) except Exception as e: raise HTTPException(400, f"Нет связи с сервером: {type(e).__name__}: {e}") if resp.status_code in (401, 403): raise HTTPException(401, "Сервер доступен, но токен отклонён (401/403).") if resp.status_code >= 400: raise HTTPException(400, f"Сервер ответил кодом {resp.status_code}.") return {"ok": True, "message": "Подключение успешно: сервер и токен валидны."} def prepare_content(raw: str, content_format: str) -> str: """Готовит контент к публикации: markdown->html, затем санитизация.""" fmt = (content_format or "html").lower() if fmt == "markdown": html = markdown_to_html(raw) elif fmt == "text": # Экранируем и сохраняем переводы строк как параграфы. escaped = bleach.clean(raw, tags=[], strip=True) paragraphs = [p.strip() for p in escaped.split("\n\n") if p.strip()] html = "".join(f"

{p}

" for p in paragraphs) or f"

{escaped}

" else: html = raw return sanitize_html(html) def inject_metadata(html_content: str, meta: dict) -> str: soup = BeautifulSoup(html_content, "html.parser") if not soup.html: wrapper = BeautifulSoup("", "html.parser") wrapper.body.append(soup) soup = wrapper elif not soup.head: head = soup.new_tag("head") soup.html.insert(0, head) soup.html["lang"] = meta.get("language", "ru") def set_meta(attrs: dict): search_attrs = {k: v for k, v in attrs.items() if k != "content"} tag = soup.head.find("meta", attrs=search_attrs) if tag: tag["content"] = attrs["content"] else: new_tag = soup.new_tag("meta") new_tag.attrs.update(attrs) soup.head.append(new_tag) if meta.get("title"): if soup.head.title: soup.head.title.string = meta["title"] else: t_tag = soup.new_tag("title") t_tag.string = meta["title"] soup.head.append(t_tag) if meta.get("description"): set_meta({"name": "description", "content": meta["description"]}) if meta.get("authors"): set_meta({"name": "author", "content": meta["authors"]}) if meta.get("site_name"): set_meta({"property": "og:site_name", "content": meta["site_name"]}) if meta.get("date"): set_meta({"name": "article:published_time", "content": meta["date"]}) return str(soup) @app.post("/api/submit") async def submit_bookmark(req: SubmitRequest): config = load_config() readeck_url = config.get("readeck_url", "").strip("/") readeck_token = config.get("readeck_token", "") public_host = config.get("public_host", "").strip() or get_lan_ip() if not readeck_url or not readeck_token: raise HTTPException(400, "URL и Токен Readeck не настроены. Откройте настройки и сохраните их.") prepared = prepare_content(req.content, req.content_format) final_html = inject_metadata(prepared, { "title": req.title, "description": req.description, "authors": req.authors, "site_name": req.site_name, "date": req.date, "language": req.language }) content_id = store_content(final_html) callback_url = f"http://{public_host}:{PORT}/content/{content_id}" payload = { "url": callback_url, "labels": req.tags, "favorite": req.favorite, "archived": req.archive } headers = { "Authorization": f"Bearer {readeck_token}", "Content-Type": "application/json" } print(f"\n[DEBUG] --- НАЧАЛО ОТПРАВКИ ---") print(f"[DEBUG] URL: {readeck_url}/api/bookmarks") async with httpx.AsyncClient() as client: try: resp = await client.post( f"{readeck_url}/api/bookmarks", json=payload, headers=headers, timeout=45.0, follow_redirects=True ) print(f"[DEBUG] Статус ответа: {resp.status_code}") if resp.status_code >= 400: raise Exception(f"Readeck отклонил запрос (Код {resp.status_code}). Ответ: {resp.text}") try: data = resp.json() except Exception: data = {"id": "Успешно, но сервер не вернул JSON"} print(f"[DEBUG] --- УСПЕШНО --- \n") return {"success": True, "bookmark": data} except Exception as e: print("\n!!! ОШИБКА READECK API !!!") traceback.print_exc() print("!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") raise HTTPException(500, detail=f"{type(e).__name__}: {str(e)}") class PreviewRequest(BaseModel): content: str title: str = "" description: str = "" authors: str = "" site_name: str = "" date: str = "" language: str = "ru" content_format: str = "html" @app.post("/api/preview") def preview(req: PreviewRequest): """Готовит финальный HTML (как при отправке) и кладёт во временное хранилище для просмотра через /content/{id} — точно так же, как его увидит Readeck.""" prepared = prepare_content(req.content, req.content_format) final_html = inject_metadata(prepared, { "title": req.title, "description": req.description, "authors": req.authors, "site_name": req.site_name, "date": req.date, "language": req.language, }) content_id = store_content(final_html) return {"id": content_id, "url": f"/content/{content_id}"} @app.get("/content/{content_id}") def get_content(content_id: str): entry = CONTENT_STORE.get(content_id) if not entry or time.time() - entry["created"] > CONTENT_TTL: CONTENT_STORE.pop(content_id, None) raise HTTPException(404, "Content not found or expired") return HTMLResponse(content=entry["html"], media_type="text/html; charset=utf-8") if __name__ == "__main__": print(f"[*] Starting Readeck Local Importer on http://0.0.0.0:{PORT}") print(f"[*] Your LAN IP (for firewall/callback) is: {get_lan_ip()}") uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info", reload=False)