"""RSS/Atom -> ntfy bridge engine. A single background thread runs sync cycles on an interval. For every enabled feed it parses entries, de-duplicates them via a SQLite history table, applies include/exclude filters, computes a (possibly quiet-hours adjusted) priority and posts a Markdown notification to the feed's ntfy topic. The engine exposes status and an in-memory log ring buffer so the web UI can display what is happening without reading files. """ import hashlib import logging import re import sqlite3 import threading import time from collections import deque from datetime import datetime, timezone from urllib.parse import urlparse, urlunparse from zoneinfo import ZoneInfo import feedparser import requests from bs4 import BeautifulSoup import store as store_mod logger = logging.getLogger("bridge") class RingBufferHandler(logging.Handler): """Keeps the most recent log records in memory for the web UI.""" def __init__(self, capacity=300): super().__init__() self.buffer = deque(maxlen=capacity) def emit(self, record): self.buffer.append( { "time": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), "level": record.levelname, "message": record.getMessage(), } ) def records(self): return list(self.buffer) class Engine: def __init__(self, store: store_mod.Store): self.store = store self._stop = threading.Event() self._wake = threading.Event() self._force = False self._thread = None self._sync_lock = threading.Lock() self.ring = RingBufferHandler() self.ring.setFormatter(logging.Formatter("%(message)s")) self.status = { "running": False, "syncing": False, "last_sync": None, "last_sync_ok": None, "next_sync": None, "last_error": None, "sent_total": 0, "sent_last_cycle": 0, } self._init_db() # ---- lifecycle --------------------------------------------------------- def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._loop, name="engine", daemon=True) self._thread.start() self.status["running"] = True logger.info("Engine started") def stop(self): self._stop.set() self._wake.set() if self._thread: self._thread.join(timeout=10) self.status["running"] = False logger.info("Engine stopped") def trigger_sync(self): """Request an immediate sync cycle even if the engine is paused.""" self._force = True self._wake.set() # ---- main loop --------------------------------------------------------- def _loop(self): while not self._stop.is_set(): settings = self.store.get_settings() if settings["enabled"] or self._force: self._force = False try: self.sync(settings) except Exception as exc: # never let the loop die logger.exception("Sync cycle crashed: %s", exc) self.status["last_error"] = str(exc) interval = max(30, int(settings.get("sync_interval", 600))) self.status["next_sync"] = datetime.now(timezone.utc).timestamp() + interval self._wake.wait(timeout=interval) self._wake.clear() # ---- database ---------------------------------------------------------- def _connect(self): conn = sqlite3.connect(store_mod.DB_PATH, timeout=30) conn.execute("PRAGMA journal_mode=WAL") return conn def _init_db(self): with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS seen_entries ( hash TEXT PRIMARY KEY, topic TEXT, created_at TEXT ) """ ) def _seen(self, conn, entry_hash): cur = conn.execute("SELECT 1 FROM seen_entries WHERE hash=?", (entry_hash,)) return cur.fetchone() is not None def _mark_seen(self, conn, entry_hash, topic): conn.execute( "INSERT OR IGNORE INTO seen_entries (hash, topic, created_at) VALUES (?,?,?)", (entry_hash, topic, datetime.now(timezone.utc).isoformat()), ) conn.commit() def history_count(self): try: with self._connect() as conn: return conn.execute("SELECT COUNT(*) FROM seen_entries").fetchone()[0] except sqlite3.Error: return 0 def clear_history(self): with self._connect() as conn: conn.execute("DELETE FROM seen_entries") conn.commit() # ---- helpers ----------------------------------------------------------- @staticmethod def clean_url(url): """Strip query/params/fragment so tracking params don't change the hash.""" try: parts = urlparse(url) return urlunparse((parts.scheme, parts.netloc, parts.path, "", "", "")) except ValueError: return url def _entry_hash(self, topic, entry): ident = entry.get("id") or self.clean_url(entry.get("link", "")) raw = f"{topic}_{ident}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _tz(self, settings): try: return ZoneInfo(settings.get("tz", "UTC")) except Exception: return timezone.utc def get_dynamic_priority(self, feed, settings): base = int(feed.get("priority", 3)) quiet = (feed.get("quiet_hours") or "").strip() if not quiet: return base nums = re.findall(r"\d+", quiet) if len(nums) < 2: logger.warning("Invalid quiet_hours '%s' for feed %s", quiet, feed.get("name")) return base start, end = int(nums[0]) % 24, int(nums[1]) % 24 hour = datetime.now(self._tz(settings)).hour if start > end: # overnight window, e.g. 22-7 is_quiet = hour >= start or hour < end else: is_quiet = start <= hour < end if is_quiet: return int(feed.get("quiet_priority", 1)) return base @staticmethod def should_filter(feed, title, summary): """Return True if the entry must be dropped.""" text = f"{title} {summary}".lower() exclude = (feed.get("exclude_regex") or "").strip() include = (feed.get("include_regex") or "").strip() try: if exclude and re.search(exclude, text): return True if include and not re.search(include, text): return True except re.error as exc: logger.warning("Bad regex on feed %s: %s", feed.get("name"), exc) return False def clean_html_content(self, html, max_len): """Return (text, image_url) extracted from an HTML snippet.""" if not html: return "", None soup = BeautifulSoup(html, "lxml") image = None img = soup.find("img") if img and img.get("src"): image = img["src"] text = soup.get_text(separator=" ", strip=True) if len(text) > max_len: text = text[:max_len].rstrip() + "…" return text, image # ---- ntfy -------------------------------------------------------------- def send_ntfy(self, settings, topic, title, message, link, priority, icon=None, attach=None, delay=None): url = settings["ntfy_url"].rstrip("/") + "/" + topic headers = { "User-Agent": settings.get("user_agent", store_mod.DEFAULT_USER_AGENT), "Title": title.encode("utf-8"), # bytes -> ntfy decodes UTF-8 "Priority": str(priority), "Tags": "newspaper", "Markdown": "yes", } if link: headers["Click"] = link if settings.get("ntfy_token"): headers["Authorization"] = f"Bearer {settings['ntfy_token']}" if icon: headers["Icon"] = icon if attach: headers["Attach"] = attach if delay: headers["Delay"] = delay resp = requests.post(url, data=message.encode("utf-8"), headers=headers, timeout=30) resp.raise_for_status() return resp def send_test(self, topic, title="Test", message="rss-bridge-ntfy test notification"): settings = self.store.get_settings() self.send_ntfy(settings, topic, title, message, link=None, priority=3) # ---- sync -------------------------------------------------------------- def sync(self, settings=None): if not self._sync_lock.acquire(blocking=False): logger.info("Sync already in progress, skipping trigger") return try: settings = settings or self.store.get_settings() self.status["syncing"] = True self.status["last_error"] = None sent_cycle = 0 feeds = [f for f in self.store.get_feeds() if f["enabled"] and f["url"] and f["topic"]] logger.info("Sync started: %d active feed(s)", len(feeds)) with self._connect() as conn: for feed in feeds: sent_cycle += self._process_feed(conn, settings, feed) self.status["sent_last_cycle"] = sent_cycle self.status["sent_total"] += sent_cycle self.status["last_sync"] = datetime.now(timezone.utc).isoformat() self.status["last_sync_ok"] = True logger.info("Sync finished: %d notification(s) sent", sent_cycle) except Exception as exc: self.status["last_sync_ok"] = False self.status["last_error"] = str(exc) logger.exception("Sync failed: %s", exc) finally: self.status["syncing"] = False self._sync_lock.release() def _process_feed(self, conn, settings, feed): name = feed.get("name") or feed.get("url") try: parsed = feedparser.parse( feed["url"], agent=settings.get("user_agent", store_mod.DEFAULT_USER_AGENT) ) except Exception as exc: logger.warning("Failed to fetch feed %s: %s", name, exc) return 0 if parsed.bozo and not parsed.entries: logger.warning("Feed %s returned no usable entries (%s)", name, parsed.get("bozo_exception")) return 0 batch_limit = int(settings.get("batch_limit", 3)) max_len = int(settings.get("max_desc_length", 250)) flood = bool(settings.get("flood_protection", True)) # New (unseen) entries, oldest first, capped to the batch limit. new_entries = [] for entry in parsed.entries: entry_hash = self._entry_hash(feed["topic"], entry) if not self._seen(conn, entry_hash): new_entries.append((entry_hash, entry)) new_entries = list(reversed(new_entries))[:batch_limit] sent = 0 for index, (entry_hash, entry) in enumerate(new_entries): title = entry.get("title", "(no title)") link = entry.get("link", "") raw_html = "" if entry.get("content"): raw_html = entry["content"][0].get("value", "") raw_html = raw_html or entry.get("summary", "") description, image = self.clean_html_content(raw_html, max_len) if self.should_filter(feed, title, description): self._mark_seen(conn, entry_hash, feed["topic"]) # drop quietly continue priority = self.get_dynamic_priority(feed, settings) # Flood protection: stagger low priority items within the batch. delay = None if flood and priority < 4 and index > 0: delay = f"{index * 5}m" read_more = {"ru": "Читать на сайте", "en": "Read on website"}.get( settings.get("language", "ru"), "Читать на сайте") body = f"**{feed.get('name', '')}**\n\n{description}" if link: body += f"\n\n[{read_more}]({link})" try: self.send_ntfy( settings, feed["topic"], title, body, link, priority, icon=feed.get("icon") or None, attach=image, delay=delay, ) self._mark_seen(conn, entry_hash, feed["topic"]) sent += 1 logger.info("Sent '%s' -> topic '%s' (priority %s)", title, feed["topic"], priority) except Exception as exc: logger.warning("Failed to send '%s' to '%s': %s", title, feed["topic"], exc) return sent