"""FastAPI application: web UI + JSON API for the RSS → ntfy bridge.""" from __future__ import annotations import logging from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone from pathlib import Path from fastapi import Depends, FastAPI, Form, HTTPException, Request, UploadFile from fastapi.responses import ( FileResponse, HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse, Response, ) from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy import Integer, text from sqlmodel import Session, func, select from starlette.middleware.sessions import SessionMiddleware from . import config, ntfy, opml, scheduler from .auth import hash_password, verify_password from .checker import check_feed, fetch_preview from .database import engine, get_session, get_settings, init_db from .models import Article, Category, Feed, Notification, SeenEntry, User from .schemas import CategoryIn, FeedIn, PreviewIn, SettingsIn, TestIn, UserIn logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) log = logging.getLogger("app") BASE_DIR = Path(__file__).parent templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) @asynccontextmanager async def lifespan(app: FastAPI): init_db() with Session(engine) as session: interval = get_settings(session).check_interval scheduler.start(interval) log.info("Приложение запущено") yield scheduler.shutdown() app = FastAPI(title="RSS → ntfy", lifespan=lifespan) app.add_middleware( SessionMiddleware, secret_key=config.SECRET_KEY, max_age=60 * 60 * 24 * 14 ) app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static") # --------------------------------------------------------------------------- # # Auth helpers # --------------------------------------------------------------------------- # def _current_user(request: Request, session: Session) -> User | None: uid = request.session.get("uid") if uid is None: return None return session.get(User, uid) def _auth_on(session: Session) -> bool: return get_settings(session).auth_enabled def require_auth(request: Request, session: Session = Depends(get_session)) -> User: """Any logged-in user (or anyone when auth is disabled).""" if not _auth_on(session): # Auth disabled → act as a virtual admin. return User(id=0, username="anonymous", role="admin") user = _current_user(request, session) if user is None: raise HTTPException(401, "Требуется авторизация") return user def require_admin(user: User = Depends(require_auth)) -> User: if user.role != "admin": raise HTTPException(403, "Требуются права администратора") return user # --------------------------------------------------------------------------- # # Pages # --------------------------------------------------------------------------- # @app.get("/", response_class=HTMLResponse) def index(request: Request, session: Session = Depends(get_session)): if _auth_on(session) and _current_user(request, session) is None: return RedirectResponse("/login", status_code=302) return templates.TemplateResponse("index.html", {"request": request}) @app.get("/login", response_class=HTMLResponse) def login_page(request: Request, session: Session = Depends(get_session)): if not _auth_on(session) or _current_user(request, session) is not None: return RedirectResponse("/", status_code=302) return templates.TemplateResponse("login.html", {"request": request, "error": None}) @app.post("/login", response_class=HTMLResponse) def login_submit( request: Request, username: str = Form(...), password: str = Form(...), session: Session = Depends(get_session), ): user = session.exec(select(User).where(User.username == username)).first() if user and verify_password(password, user.password_hash): request.session["uid"] = user.id return RedirectResponse("/", status_code=302) return templates.TemplateResponse( "login.html", {"request": request, "error": "Неверный логин или пароль"}, status_code=401, ) @app.get("/logout") def logout(request: Request): request.session.clear() return RedirectResponse("/login", status_code=302) @app.get("/api/me") def whoami( request: Request, session: Session = Depends(get_session), user: User = Depends(require_auth) ): return { "username": user.username, "role": user.role, "auth_enabled": _auth_on(session), } # --------------------------------------------------------------------------- # # API: feeds # --------------------------------------------------------------------------- # def _feed_dict(feed: Feed) -> dict: return { "id": feed.id, "url": feed.url, "title": feed.title, "ntfy_server": feed.ntfy_server, "ntfy_topic": feed.ntfy_topic, "ntfy_token": feed.ntfy_token, "ntfy_username": feed.ntfy_username, "ntfy_password": feed.ntfy_password, "priority": feed.priority, "tags": feed.tags, "attach_image": feed.attach_image, "to_telegram": feed.to_telegram, "to_webhook": feed.to_webhook, "filter_include": feed.filter_include, "filter_exclude": feed.filter_exclude, "interval": feed.interval, "enabled": feed.enabled, "send_full_content": feed.send_full_content, "fetch_full_article": feed.fetch_full_article, "digest_enabled": feed.digest_enabled, "digest_period_hours": feed.digest_period_hours, "category_id": feed.category_id, "last_checked": feed.last_checked.isoformat() if feed.last_checked else None, "last_status": feed.last_status, "error_streak": feed.error_streak, } @app.get("/api/feeds") def list_feeds(session: Session = Depends(get_session), _: User = Depends(require_auth)): feeds = session.exec(select(Feed).order_by(Feed.id)).all() return [_feed_dict(f) for f in feeds] @app.post("/api/feeds") def create_feed( data: FeedIn, session: Session = Depends(get_session), _: User = Depends(require_admin), ): feed_data = data.model_dump() if feed_data.get("category_id") is None: general = session.exec( select(Category).where(Category.name == "Общее") ).first() if general: feed_data["category_id"] = general.id feed = Feed(**feed_data) session.add(feed) session.commit() session.refresh(feed) return _feed_dict(feed) @app.put("/api/feeds/{feed_id}") def update_feed( feed_id: int, data: FeedIn, session: Session = Depends(get_session), _: User = Depends(require_admin), ): feed = session.get(Feed, feed_id) if feed is None: raise HTTPException(404, "Лента не найдена") for key, value in data.model_dump().items(): setattr(feed, key, value) session.add(feed) session.commit() session.refresh(feed) return _feed_dict(feed) @app.delete("/api/feeds/{feed_id}") def delete_feed( feed_id: int, session: Session = Depends(get_session), _: User = Depends(require_admin), ): feed = session.get(Feed, feed_id) if feed is None: raise HTTPException(404, "Лента не найдена") for entry in session.exec(select(SeenEntry).where(SeenEntry.feed_id == feed_id)).all(): session.delete(entry) for note in session.exec(select(Notification).where(Notification.feed_id == feed_id)).all(): session.delete(note) session.delete(feed) session.commit() return {"ok": True} @app.post("/api/feeds/{feed_id}/check") async def check_now( feed_id: int, session: Session = Depends(get_session), _: User = Depends(require_auth), ): feed = session.get(Feed, feed_id) if feed is None: raise HTTPException(404, "Лента не найдена") status = await check_feed(feed) session.refresh(feed) return {"status": status, "feed": _feed_dict(feed)} # --------------------------------------------------------------------------- # # API: OPML import / export # --------------------------------------------------------------------------- # @app.get("/api/feeds/export") def export_feeds(session: Session = Depends(get_session), _: User = Depends(require_auth)): feeds = session.exec(select(Feed).order_by(Feed.id)).all() xml = opml.export_opml(feeds) return Response( xml, media_type="text/x-opml", headers={"Content-Disposition": 'attachment; filename="feeds.opml"'}, ) @app.post("/api/feeds/import") async def import_feeds( file: UploadFile, session: Session = Depends(get_session), _: User = Depends(require_admin), ): raw = (await file.read()).decode("utf-8", errors="replace") items = opml.parse_opml(raw) existing = {f.url for f in session.exec(select(Feed)).all()} added = 0 for item in items: if item["url"] in existing: continue session.add(Feed(**item)) existing.add(item["url"]) added += 1 session.commit() return {"ok": True, "added": added, "total": len(items)} # --------------------------------------------------------------------------- # # API: categories # --------------------------------------------------------------------------- # def _category_dict(c: Category) -> dict: return {"id": c.id, "name": c.name, "sort_order": c.sort_order} @app.get("/api/categories") def list_categories( session: Session = Depends(get_session), _: User = Depends(require_auth) ): cats = session.exec(select(Category).order_by(Category.sort_order, Category.id)).all() return [_category_dict(c) for c in cats] @app.post("/api/categories") def create_category( data: CategoryIn, session: Session = Depends(get_session), _: User = Depends(require_admin), ): if session.exec(select(Category).where(Category.name == data.name)).first(): raise HTTPException(400, "Категория с таким названием уже существует") cat = Category(**data.model_dump()) session.add(cat) session.commit() session.refresh(cat) return _category_dict(cat) @app.put("/api/categories/{cat_id}") def update_category( cat_id: int, data: CategoryIn, session: Session = Depends(get_session), _: User = Depends(require_admin), ): cat = session.get(Category, cat_id) if cat is None: raise HTTPException(404, "Категория не найдена") dup = session.exec( select(Category).where(Category.name == data.name, Category.id != cat_id) ).first() if dup: raise HTTPException(400, "Категория с таким названием уже существует") cat.name = data.name cat.sort_order = data.sort_order session.add(cat) session.commit() session.refresh(cat) return _category_dict(cat) @app.delete("/api/categories/{cat_id}") def delete_category( cat_id: int, session: Session = Depends(get_session), _: User = Depends(require_admin), ): cat = session.get(Category, cat_id) if cat is None: raise HTTPException(404, "Категория не найдена") # Unlink feeds from this category before deleting. for feed in session.exec( select(Feed).where(Feed.category_id == cat_id) ).all(): feed.category_id = None session.add(feed) session.delete(cat) session.commit() return {"ok": True} # --------------------------------------------------------------------------- # # API: history & stats # --------------------------------------------------------------------------- # @app.get("/api/history") def history( limit: int = 100, q: str = "", only_errors: bool = False, session: Session = Depends(get_session), _: User = Depends(require_auth), ): limit = min(500, max(1, limit)) query = select(Notification) if q.strip(): qs = q.strip() # Try FTS5 full-text search first try: fts_rows = session.exec( text("SELECT rowid FROM notification_fts WHERE notification_fts MATCH :q"), {"q": qs}, ).all() if fts_rows: matched_ids = [r[0] for r in fts_rows] query = query.where(Notification.id.in_(matched_ids)) else: raise ValueError # force fallback except Exception: like = f"%{qs}%" query = query.where( Notification.title.ilike(like) | Notification.feed_title.ilike(like) ) if only_errors: query = query.where(Notification.ok == False) # noqa: E712 notes = session.exec( query.order_by(Notification.created_at.desc()).limit(limit) ).all() return [ { "id": n.id, "feed_title": n.feed_title, "title": n.title, "link": n.link, "channels": n.channels, "ok": n.ok, "detail": n.detail, "created_at": n.created_at.isoformat(), } for n in notes ] @app.delete("/api/history") def clear_history( session: Session = Depends(get_session), _: User = Depends(require_admin) ): for note in session.exec(select(Notification)).all(): session.delete(note) session.commit() return {"ok": True} @app.get("/api/stats") def stats(session: Session = Depends(get_session), _: User = Depends(require_auth)): feeds = session.exec(select(Feed)).all() total_sent = session.exec( select(func.count()).select_from(Notification).where(Notification.ok == True) # noqa: E712 ).one() total_failed = session.exec( select(func.count()).select_from(Notification).where(Notification.ok == False) # noqa: E712 ).one() return { "feeds_total": len(feeds), "feeds_enabled": sum(1 for f in feeds if f.enabled), "feeds_failing": sum(1 for f in feeds if f.error_streak > 0), "notifications_sent": total_sent, "notifications_failed": total_failed, } @app.get("/api/stats/activity") def activity( days: int = 14, session: Session = Depends(get_session), _: User = Depends(require_auth), ): """Notification counts grouped by day for the last `days` days.""" days = min(90, max(1, days)) day = func.date(Notification.created_at) rows = session.exec( select( day, func.sum(func.cast(Notification.ok, Integer)), func.count(), ).group_by(day) ).all() by_day = {str(d): (int(ok or 0), int(total)) for d, ok, total in rows} out = [] today = datetime.now(timezone.utc).date() for i in range(days - 1, -1, -1): d = (today - timedelta(days=i)).isoformat() sent, total = by_day.get(d, (0, 0)) out.append({"date": d, "sent": sent, "failed": total - sent}) return out @app.post("/api/preview") async def preview( data: PreviewIn, session: Session = Depends(get_session), _: User = Depends(require_auth), ): try: return await fetch_preview(data.url, data.filter_include, data.filter_exclude) except ValueError as exc: raise HTTPException(400, str(exc)) except Exception as exc: # noqa: BLE001 raise HTTPException(502, f"Не удалось загрузить ленту: {exc}") # --------------------------------------------------------------------------- # # API: backup / restore (admin only) # --------------------------------------------------------------------------- # @app.get("/api/backup") def download_backup(_: User = Depends(require_admin)): db_path = config.DATA_DIR / "app.db" if not db_path.exists(): raise HTTPException(404, "База данных не найдена") now = datetime.now().strftime("%Y-%m-%d") return FileResponse( db_path, media_type="application/octet-stream", filename=f"rss-ntfy-backup-{now}.db", ) @app.post("/api/backup") async def upload_backup( file: UploadFile, _: User = Depends(require_admin), ): import shutil if not file.filename or not file.filename.endswith(".db"): raise HTTPException(400, "Ожидается файл .db") tmp_path = config.DATA_DIR / "restore_tmp.db" content_bytes = await file.read() tmp_path.write_bytes(content_bytes) try: import sqlite3 conn = sqlite3.connect(str(tmp_path)) tables = { row[0] for row in conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall() } required = {"feed", "settings", "seenentry"} missing = required - tables if missing: raise HTTPException( 400, f"В файле не хватает таблиц: {', '.join(sorted(missing))}" ) conn.close() db_path = config.DATA_DIR / "app.db" backup_path = ( config.DATA_DIR / f"app.db.bak-{datetime.now().strftime('%Y%m%d%H%M%S')}" ) if db_path.exists(): shutil.copy2(db_path, backup_path) shutil.copy2(tmp_path, db_path) except HTTPException: raise except Exception as exc: raise HTTPException(400, f"Невалидный файл: {exc}") finally: if tmp_path.exists(): tmp_path.unlink(missing_ok=True) scheduler.shutdown() with Session(engine) as session: interval = get_settings(session).check_interval scheduler.start(interval) return {"ok": True} # --------------------------------------------------------------------------- # # API: settings # --------------------------------------------------------------------------- # @app.get("/api/settings") def read_settings(session: Session = Depends(get_session), _: User = Depends(require_auth)): s = get_settings(session) return { "default_ntfy_server": s.default_ntfy_server, "default_ntfy_token": s.default_ntfy_token, "default_ntfy_username": s.default_ntfy_username, "default_ntfy_password": s.default_ntfy_password, "check_interval": s.check_interval, "auth_enabled": s.auth_enabled, "telegram_enabled": s.telegram_enabled, "telegram_token": s.telegram_token, "telegram_chat_id": s.telegram_chat_id, "webhook_enabled": s.webhook_enabled, "webhook_url": s.webhook_url, "alerts_enabled": s.alerts_enabled, "alert_topic": s.alert_topic, "alert_threshold": s.alert_threshold, "default_priority": s.default_priority, "notification_template": s.notification_template, "proxy_url": s.proxy_url, "default_tags": s.default_tags, "default_attach_image": s.default_attach_image, "default_interval": s.default_interval, } @app.put("/api/settings") def write_settings( data: SettingsIn, session: Session = Depends(get_session), _: User = Depends(require_admin), ): s = get_settings(session) interval_changed = s.check_interval != data.check_interval if data.auth_enabled and not session.exec(select(User)).first(): raise HTTPException(400, "Создайте хотя бы одного пользователя перед включением авторизации") s.default_ntfy_server = data.default_ntfy_server.strip() or "https://ntfy.sh" s.default_ntfy_token = data.default_ntfy_token.strip() s.default_ntfy_username = data.default_ntfy_username.strip() s.default_ntfy_password = data.default_ntfy_password s.check_interval = data.check_interval s.auth_enabled = data.auth_enabled s.telegram_enabled = data.telegram_enabled s.telegram_token = data.telegram_token.strip() s.telegram_chat_id = data.telegram_chat_id.strip() s.webhook_enabled = data.webhook_enabled s.webhook_url = data.webhook_url.strip() s.alerts_enabled = data.alerts_enabled s.alert_topic = data.alert_topic.strip() s.alert_threshold = data.alert_threshold s.default_priority = data.default_priority s.default_tags = data.default_tags.strip() s.default_attach_image = data.default_attach_image s.default_interval = data.default_interval s.notification_template = data.notification_template s.proxy_url = data.proxy_url.strip() session.add(s) session.commit() if interval_changed: scheduler.reschedule(data.check_interval) return {"ok": True} # --------------------------------------------------------------------------- # # API: users # --------------------------------------------------------------------------- # def _user_dict(u: User) -> dict: return {"id": u.id, "username": u.username, "role": u.role} @app.get("/api/users") def list_users(session: Session = Depends(get_session), _: User = Depends(require_admin)): users = session.exec(select(User).order_by(User.id)).all() return [_user_dict(u) for u in users] @app.post("/api/users") def create_user( data: UserIn, session: Session = Depends(get_session), _: User = Depends(require_admin), ): if session.exec(select(User).where(User.username == data.username)).first(): raise HTTPException(400, "Пользователь с таким логином уже существует") if not data.password.strip(): raise HTTPException(400, "Пароль обязателен для нового пользователя") user = User( username=data.username, password_hash=hash_password(data.password), role=data.role, ) session.add(user) session.commit() session.refresh(user) return _user_dict(user) @app.put("/api/users/{user_id}") def update_user( user_id: int, data: UserIn, session: Session = Depends(get_session), _: User = Depends(require_admin), ): user = session.get(User, user_id) if user is None: raise HTTPException(404, "Пользователь не найден") # Don't allow demoting the last remaining admin. if user.role == "admin" and data.role != "admin": admins = session.exec(select(User).where(User.role == "admin")).all() if len(admins) <= 1: raise HTTPException(400, "Нельзя понизить последнего администратора") user.username = data.username user.role = data.role if data.password.strip(): user.password_hash = hash_password(data.password) session.add(user) session.commit() return _user_dict(user) @app.delete("/api/users/{user_id}") def delete_user( user_id: int, request: Request, session: Session = Depends(get_session), me: User = Depends(require_admin), ): user = session.get(User, user_id) if user is None: raise HTTPException(404, "Пользователь не найден") if user.id == me.id: raise HTTPException(400, "Нельзя удалить самого себя") if user.role == "admin": admins = session.exec(select(User).where(User.role == "admin")).all() if len(admins) <= 1: raise HTTPException(400, "Нельзя удалить последнего администратора") session.delete(user) session.commit() return {"ok": True} # --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- # # API: articles (RSS reader) # --------------------------------------------------------------------------- # @app.get("/api/articles") def list_articles( category_id: int | None = None, feed_id: int | None = None, unread: bool = False, q: str = "", limit: int = 50, offset: int = 0, session: Session = Depends(get_session), _: User = Depends(require_auth), ): limit = min(200, max(1, limit)) query = select(Article) if category_id is not None: feed_ids = [ f.id for f in session.exec( select(Feed).where(Feed.category_id == category_id) ).all() ] # "Общее" category also includes feeds with NULL category general = session.exec( select(Category).where(Category.name == "Общее") ).first() if general and category_id == general.id: feeds_no_cat = session.exec( select(Feed).where(Feed.category_id == None) # noqa: E711 ).all() feed_ids.extend(f.id for f in feeds_no_cat) if feed_ids: query = query.where(Article.feed_id.in_(feed_ids)) else: return [] if feed_id is not None: query = query.where(Article.feed_id == feed_id) if unread: query = query.where(Article.is_read == False) # noqa: E712 if q.strip(): like = f"%{q.strip()}%" query = query.where( Article.title.ilike(like) | Article.body.ilike(like) ) articles = session.exec( query.order_by(Article.created_at.desc()) .offset(offset) .limit(limit) ).all() return [ { "id": a.id, "feed_id": a.feed_id, "feed_title": a.feed_title, "title": a.title, "body": a.body[:300] + ("..." if a.body and len(a.body) > 300 else ""), "link": a.link, "image": a.image, "published_at": a.published_at.isoformat() if a.published_at else None, "is_read": a.is_read, "created_at": a.created_at.isoformat(), } for a in articles ] @app.get("/api/articles/stats") def article_stats( session: Session = Depends(get_session), _: User = Depends(require_auth), ): """Unread article counts per category.""" cats = session.exec( select(Category).order_by(Category.sort_order, Category.id) ).all() result = [] for cat in cats: feed_ids = [ f.id for f in session.exec( select(Feed).where(Feed.category_id == cat.id) ).all() ] if cat.name == "Общее": feeds_no_cat = session.exec( select(Feed).where(Feed.category_id == None) # noqa: E711 ).all() feed_ids.extend(f.id for f in feeds_no_cat) count = 0 if feed_ids: count = session.exec( select(func.count()) .select_from(Article) .where( Article.feed_id.in_(feed_ids), Article.is_read == False, # noqa: E712 ) ).one() result.append( {"category_id": cat.id, "category_name": cat.name, "unread": count} ) return result @app.put("/api/articles/read-all") def mark_all_read( category_id: int | None = None, feed_id: int | None = None, session: Session = Depends(get_session), _: User = Depends(require_auth), ): query = select(Article).where(Article.is_read == False) # noqa: E712 if category_id is not None: feed_ids = [ f.id for f in session.exec( select(Feed).where(Feed.category_id == category_id) ).all() ] general = session.exec( select(Category).where(Category.name == "Общее") ).first() if general and category_id == general.id: feeds_no_cat = session.exec( select(Feed).where(Feed.category_id == None) # noqa: E711 ).all() feed_ids.extend(f.id for f in feeds_no_cat) if feed_ids: query = query.where(Article.feed_id.in_(feed_ids)) else: return {"marked": 0} if feed_id is not None: query = query.where(Article.feed_id == feed_id) articles = session.exec(query).all() for a in articles: a.is_read = True session.add(a) session.commit() return {"marked": len(articles)} @app.get("/api/articles/{article_id}") def get_article( article_id: int, session: Session = Depends(get_session), _: User = Depends(require_auth), ): a = session.get(Article, article_id) if a is None: raise HTTPException(404, "Статья не найдена") # Mark as read on view if not a.is_read: a.is_read = True session.add(a) session.commit() session.refresh(a) return { "id": a.id, "feed_id": a.feed_id, "feed_title": a.feed_title, "title": a.title, "body": a.body, "full_html": a.full_html, "link": a.link, "image": a.image, "published_at": a.published_at.isoformat() if a.published_at else None, "is_read": a.is_read, "created_at": a.created_at.isoformat(), } @app.put("/api/articles/{article_id}/read") def mark_read( article_id: int, session: Session = Depends(get_session), _: User = Depends(require_auth), ): a = session.get(Article, article_id) if a is None: raise HTTPException(404, "Статья не найдена") a.is_read = True session.add(a) session.commit() return {"ok": True} # API: test notification # --------------------------------------------------------------------------- # @app.post("/api/test") async def test_notification( data: TestIn, session: Session = Depends(get_session), _: User = Depends(require_auth), ): s = get_settings(session) server = data.server.strip() or s.default_ntfy_server if not data.topic.strip(): raise HTTPException(400, "Укажите тему") # Prefer auth typed in the form right now; fall back to saved defaults so # the test works whether or not settings were saved first. token = s.default_ntfy_token if data.token is None else data.token.strip() username = s.default_ntfy_username if data.username is None else data.username.strip() password = s.default_ntfy_password if data.password is None else data.password try: await ntfy.publish( server=server, topic=data.topic, title="RSS to ntfy", message="Тестовое уведомление — всё работает!", tags="white_check_mark", priority=3, token=token, username=username, password=password, ) except Exception as exc: # noqa: BLE001 raise HTTPException(502, f"Не удалось отправить: {exc}") return {"ok": True, "sent_to": f"{server.rstrip('/')}/{data.topic}"} @app.exception_handler(HTTPException) async def http_exc_handler(request: Request, exc: HTTPException): return JSONResponse({"detail": exc.detail}, status_code=exc.status_code)