Files
rss-ntfy/app/delivery.py
T
dimon bf52bc3079
build-and-push / docker (push) Has been cancelled
RSS → ntfy bridge with modern web UI
Features: feed CRUD, per-feed ntfy target (incl. private servers),
Telegram/webhook channels, keyword filters, image attachments,
per-feed intervals, OPML import/export, notification history & stats,
users with roles, admin alerts, RU/EN i18n, light/dark theme,
notification preview, history search, activity chart. Dockerized.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 21:11:57 +08:00

152 lines
4.8 KiB
Python

"""Alternative delivery channels and a unified dispatcher.
A single feed entry can fan out to several channels: ntfy (always, if a topic
is set), Telegram, and a generic webhook. Each channel is independent — one
failing does not block the others. dispatch() returns which channels succeeded
and an error string describing any failures (for the history log).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
import httpx
from . import ntfy
from .models import Feed, Settings
log = logging.getLogger("delivery")
@dataclass
class Message:
source: str # feed title
title: str # entry title
body: str # plain-text summary
link: str = ""
image: str = "" # image URL, if any
@dataclass
class DispatchResult:
channels: list[str] = field(default_factory=list) # succeeded channels
errors: list[str] = field(default_factory=list)
@property
def ok(self) -> bool:
return not self.errors and bool(self.channels)
@property
def detail(self) -> str:
return "; ".join(self.errors)
async def _send_telegram(settings: Settings, msg: Message) -> None:
token = settings.telegram_token.strip()
chat_id = settings.telegram_chat_id.strip()
if not token or not chat_id:
raise ValueError("Telegram не настроен (токен/chat_id)")
text = f"<b>{_esc(msg.title)}</b>"
if msg.source:
text = f"📡 <i>{_esc(msg.source)}</i>\n{text}"
if msg.body:
text += f"\n\n{_esc(msg.body[:600])}"
if msg.link:
text += f'\n\n<a href="{_esc(msg.link)}">Открыть →</a>'
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json={
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": False,
},
)
resp.raise_for_status()
async def _send_webhook(settings: Settings, feed: Feed, msg: Message) -> None:
url = settings.webhook_url.strip()
if not url:
raise ValueError("Webhook URL не задан")
payload = {
"feed": msg.source,
"feed_url": feed.url,
"title": msg.title,
"body": msg.body,
"link": msg.link,
"image": msg.image,
}
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
def _esc(text: str) -> str:
return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
async def dispatch(feed: Feed, settings: Settings, msg: Message) -> DispatchResult:
"""Send a message across every channel enabled for this feed."""
result = DispatchResult()
server = feed.ntfy_server.strip() or settings.default_ntfy_server
full_title = f"{msg.source}: {msg.title}" if msg.source else msg.title
# --- ntfy (default channel; requires a topic) ---
if feed.ntfy_topic.strip():
try:
await ntfy.publish(
server=server,
topic=feed.ntfy_topic,
title=full_title,
message=msg.body or "(нет описания)",
click=msg.link,
tags=feed.tags,
priority=feed.priority,
attach=msg.image if feed.attach_image else "",
token=feed.ntfy_token,
username=feed.ntfy_username,
password=feed.ntfy_password,
)
result.channels.append("ntfy")
except Exception as exc: # noqa: BLE001
result.errors.append(f"ntfy: {exc}")
# --- Telegram ---
if feed.to_telegram and settings.telegram_enabled:
try:
await _send_telegram(settings, msg)
result.channels.append("telegram")
except Exception as exc: # noqa: BLE001
result.errors.append(f"telegram: {exc}")
# --- Webhook ---
if feed.to_webhook and settings.webhook_enabled:
try:
await _send_webhook(settings, feed, msg)
result.channels.append("webhook")
except Exception as exc: # noqa: BLE001
result.errors.append(f"webhook: {exc}")
return result
async def send_admin_alert(settings: Settings, text: str) -> None:
"""Best-effort health alert to the admin ntfy topic."""
if not settings.alerts_enabled or not settings.alert_topic.strip():
return
try:
await ntfy.publish(
server=settings.default_ntfy_server,
topic=settings.alert_topic,
title="RSS to ntfy — alert",
message=text,
tags="warning",
priority=4,
)
except Exception as exc: # noqa: BLE001
log.warning("admin alert failed: %s", exc)