Files
dimon 3f9b108482 RSS/Atom -> ntfy bridge with web UI, OPML import/export and RU/EN localization
Web-managed fork of nurefexc/rss-bridge-ntfy: Flask UI + REST API, background
sync engine (SQLite dedup, quiet hours, filters, flood protection, images),
OPML import/export and switchable interface/notification language.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:34:53 +08:00

354 lines
13 KiB
Python

"""RSS/Atom -> ntfy bridge engine.
A single background thread runs sync cycles on an interval. For every enabled
feed it parses entries, de-duplicates them via a SQLite history table, applies
include/exclude filters, computes a (possibly quiet-hours adjusted) priority and
posts a Markdown notification to the feed's ntfy topic.
The engine exposes status and an in-memory log ring buffer so the web UI can
display what is happening without reading files.
"""
import hashlib
import logging
import re
import sqlite3
import threading
import time
from collections import deque
from datetime import datetime, timezone
from urllib.parse import urlparse, urlunparse
from zoneinfo import ZoneInfo
import feedparser
import requests
from bs4 import BeautifulSoup
import store as store_mod
logger = logging.getLogger("bridge")
class RingBufferHandler(logging.Handler):
"""Keeps the most recent log records in memory for the web UI."""
def __init__(self, capacity=300):
super().__init__()
self.buffer = deque(maxlen=capacity)
def emit(self, record):
self.buffer.append(
{
"time": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"message": record.getMessage(),
}
)
def records(self):
return list(self.buffer)
class Engine:
def __init__(self, store: store_mod.Store):
self.store = store
self._stop = threading.Event()
self._wake = threading.Event()
self._force = False
self._thread = None
self._sync_lock = threading.Lock()
self.ring = RingBufferHandler()
self.ring.setFormatter(logging.Formatter("%(message)s"))
self.status = {
"running": False,
"syncing": False,
"last_sync": None,
"last_sync_ok": None,
"next_sync": None,
"last_error": None,
"sent_total": 0,
"sent_last_cycle": 0,
}
self._init_db()
# ---- lifecycle ---------------------------------------------------------
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(target=self._loop, name="engine", daemon=True)
self._thread.start()
self.status["running"] = True
logger.info("Engine started")
def stop(self):
self._stop.set()
self._wake.set()
if self._thread:
self._thread.join(timeout=10)
self.status["running"] = False
logger.info("Engine stopped")
def trigger_sync(self):
"""Request an immediate sync cycle even if the engine is paused."""
self._force = True
self._wake.set()
# ---- main loop ---------------------------------------------------------
def _loop(self):
while not self._stop.is_set():
settings = self.store.get_settings()
if settings["enabled"] or self._force:
self._force = False
try:
self.sync(settings)
except Exception as exc: # never let the loop die
logger.exception("Sync cycle crashed: %s", exc)
self.status["last_error"] = str(exc)
interval = max(30, int(settings.get("sync_interval", 600)))
self.status["next_sync"] = datetime.now(timezone.utc).timestamp() + interval
self._wake.wait(timeout=interval)
self._wake.clear()
# ---- database ----------------------------------------------------------
def _connect(self):
conn = sqlite3.connect(store_mod.DB_PATH, timeout=30)
conn.execute("PRAGMA journal_mode=WAL")
return conn
def _init_db(self):
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS seen_entries (
hash TEXT PRIMARY KEY,
topic TEXT,
created_at TEXT
)
"""
)
def _seen(self, conn, entry_hash):
cur = conn.execute("SELECT 1 FROM seen_entries WHERE hash=?", (entry_hash,))
return cur.fetchone() is not None
def _mark_seen(self, conn, entry_hash, topic):
conn.execute(
"INSERT OR IGNORE INTO seen_entries (hash, topic, created_at) VALUES (?,?,?)",
(entry_hash, topic, datetime.now(timezone.utc).isoformat()),
)
conn.commit()
def history_count(self):
try:
with self._connect() as conn:
return conn.execute("SELECT COUNT(*) FROM seen_entries").fetchone()[0]
except sqlite3.Error:
return 0
def clear_history(self):
with self._connect() as conn:
conn.execute("DELETE FROM seen_entries")
conn.commit()
# ---- helpers -----------------------------------------------------------
@staticmethod
def clean_url(url):
"""Strip query/params/fragment so tracking params don't change the hash."""
try:
parts = urlparse(url)
return urlunparse((parts.scheme, parts.netloc, parts.path, "", "", ""))
except ValueError:
return url
def _entry_hash(self, topic, entry):
ident = entry.get("id") or self.clean_url(entry.get("link", ""))
raw = f"{topic}_{ident}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _tz(self, settings):
try:
return ZoneInfo(settings.get("tz", "UTC"))
except Exception:
return timezone.utc
def get_dynamic_priority(self, feed, settings):
base = int(feed.get("priority", 3))
quiet = (feed.get("quiet_hours") or "").strip()
if not quiet:
return base
nums = re.findall(r"\d+", quiet)
if len(nums) < 2:
logger.warning("Invalid quiet_hours '%s' for feed %s", quiet, feed.get("name"))
return base
start, end = int(nums[0]) % 24, int(nums[1]) % 24
hour = datetime.now(self._tz(settings)).hour
if start > end: # overnight window, e.g. 22-7
is_quiet = hour >= start or hour < end
else:
is_quiet = start <= hour < end
if is_quiet:
return int(feed.get("quiet_priority", 1))
return base
@staticmethod
def should_filter(feed, title, summary):
"""Return True if the entry must be dropped."""
text = f"{title} {summary}".lower()
exclude = (feed.get("exclude_regex") or "").strip()
include = (feed.get("include_regex") or "").strip()
try:
if exclude and re.search(exclude, text):
return True
if include and not re.search(include, text):
return True
except re.error as exc:
logger.warning("Bad regex on feed %s: %s", feed.get("name"), exc)
return False
def clean_html_content(self, html, max_len):
"""Return (text, image_url) extracted from an HTML snippet."""
if not html:
return "", None
soup = BeautifulSoup(html, "lxml")
image = None
img = soup.find("img")
if img and img.get("src"):
image = img["src"]
text = soup.get_text(separator=" ", strip=True)
if len(text) > max_len:
text = text[:max_len].rstrip() + ""
return text, image
# ---- ntfy --------------------------------------------------------------
def send_ntfy(self, settings, topic, title, message, link,
priority, icon=None, attach=None, delay=None):
url = settings["ntfy_url"].rstrip("/") + "/" + topic
headers = {
"User-Agent": settings.get("user_agent", store_mod.DEFAULT_USER_AGENT),
"Title": title.encode("utf-8"), # bytes -> ntfy decodes UTF-8
"Priority": str(priority),
"Tags": "newspaper",
"Markdown": "yes",
}
if link:
headers["Click"] = link
if settings.get("ntfy_token"):
headers["Authorization"] = f"Bearer {settings['ntfy_token']}"
if icon:
headers["Icon"] = icon
if attach:
headers["Attach"] = attach
if delay:
headers["Delay"] = delay
resp = requests.post(url, data=message.encode("utf-8"),
headers=headers, timeout=30)
resp.raise_for_status()
return resp
def send_test(self, topic, title="Test", message="rss-bridge-ntfy test notification"):
settings = self.store.get_settings()
self.send_ntfy(settings, topic, title, message, link=None, priority=3)
# ---- sync --------------------------------------------------------------
def sync(self, settings=None):
if not self._sync_lock.acquire(blocking=False):
logger.info("Sync already in progress, skipping trigger")
return
try:
settings = settings or self.store.get_settings()
self.status["syncing"] = True
self.status["last_error"] = None
sent_cycle = 0
feeds = [f for f in self.store.get_feeds() if f["enabled"] and f["url"] and f["topic"]]
logger.info("Sync started: %d active feed(s)", len(feeds))
with self._connect() as conn:
for feed in feeds:
sent_cycle += self._process_feed(conn, settings, feed)
self.status["sent_last_cycle"] = sent_cycle
self.status["sent_total"] += sent_cycle
self.status["last_sync"] = datetime.now(timezone.utc).isoformat()
self.status["last_sync_ok"] = True
logger.info("Sync finished: %d notification(s) sent", sent_cycle)
except Exception as exc:
self.status["last_sync_ok"] = False
self.status["last_error"] = str(exc)
logger.exception("Sync failed: %s", exc)
finally:
self.status["syncing"] = False
self._sync_lock.release()
def _process_feed(self, conn, settings, feed):
name = feed.get("name") or feed.get("url")
try:
parsed = feedparser.parse(
feed["url"], agent=settings.get("user_agent", store_mod.DEFAULT_USER_AGENT)
)
except Exception as exc:
logger.warning("Failed to fetch feed %s: %s", name, exc)
return 0
if parsed.bozo and not parsed.entries:
logger.warning("Feed %s returned no usable entries (%s)", name, parsed.get("bozo_exception"))
return 0
batch_limit = int(settings.get("batch_limit", 3))
max_len = int(settings.get("max_desc_length", 250))
flood = bool(settings.get("flood_protection", True))
# New (unseen) entries, oldest first, capped to the batch limit.
new_entries = []
for entry in parsed.entries:
entry_hash = self._entry_hash(feed["topic"], entry)
if not self._seen(conn, entry_hash):
new_entries.append((entry_hash, entry))
new_entries = list(reversed(new_entries))[:batch_limit]
sent = 0
for index, (entry_hash, entry) in enumerate(new_entries):
title = entry.get("title", "(no title)")
link = entry.get("link", "")
raw_html = ""
if entry.get("content"):
raw_html = entry["content"][0].get("value", "")
raw_html = raw_html or entry.get("summary", "")
description, image = self.clean_html_content(raw_html, max_len)
if self.should_filter(feed, title, description):
self._mark_seen(conn, entry_hash, feed["topic"]) # drop quietly
continue
priority = self.get_dynamic_priority(feed, settings)
# Flood protection: stagger low priority items within the batch.
delay = None
if flood and priority < 4 and index > 0:
delay = f"{index * 5}m"
read_more = {"ru": "Читать на сайте", "en": "Read on website"}.get(
settings.get("language", "ru"), "Читать на сайте")
body = f"**{feed.get('name', '')}**\n\n{description}"
if link:
body += f"\n\n[{read_more}]({link})"
try:
self.send_ntfy(
settings, feed["topic"], title, body, link,
priority, icon=feed.get("icon") or None,
attach=image, delay=delay,
)
self._mark_seen(conn, entry_hash, feed["topic"])
sent += 1
logger.info("Sent '%s' -> topic '%s' (priority %s)", title, feed["topic"], priority)
except Exception as exc:
logger.warning("Failed to send '%s' to '%s': %s", title, feed["topic"], exc)
return sent