2026-06-02 21:11:57 +08:00
|
|
|
"""Background RSS polling and dispatch across channels."""
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import asyncio
|
2026-06-03 20:47:46 +08:00
|
|
|
import json
|
2026-06-02 21:11:57 +08:00
|
|
|
import logging
|
|
|
|
|
import re
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from html import unescape
|
|
|
|
|
|
2026-06-03 20:47:46 +08:00
|
|
|
import httpx
|
|
|
|
|
from io import StringIO
|
|
|
|
|
|
2026-06-02 21:11:57 +08:00
|
|
|
import feedparser
|
|
|
|
|
from sqlmodel import Session, select
|
|
|
|
|
|
|
|
|
|
from . import delivery
|
|
|
|
|
from .database import engine, get_settings
|
|
|
|
|
from .delivery import Message
|
2026-06-03 20:47:46 +08:00
|
|
|
from .models import Article, DigestEntry, Feed, Notification, SeenEntry
|
2026-06-02 21:11:57 +08:00
|
|
|
|
|
|
|
|
log = logging.getLogger("checker")
|
|
|
|
|
|
|
|
|
|
_TAG_RE = re.compile(r"<[^>]+>")
|
|
|
|
|
_IMG_RE = re.compile(r'<img[^>]+src=["\']([^"\']+)["\']', re.IGNORECASE)
|
2026-06-03 20:47:46 +08:00
|
|
|
_VIDEO_RE = re.compile(
|
|
|
|
|
r'<(?:video|iframe|source|embed)[^>]*src=["\']([^"\']+)["\']', re.IGNORECASE
|
|
|
|
|
)
|
|
|
|
|
_ENC_VIDEO_RE = re.compile(r'<(?:video|iframe|source|embed)[^>]*>', re.IGNORECASE)
|
2026-06-02 21:11:57 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _strip_html(text: str, limit: int = 1500) -> str:
|
|
|
|
|
text = unescape(_TAG_RE.sub(" ", text or ""))
|
|
|
|
|
text = re.sub(r"[ \t]+", " ", text)
|
|
|
|
|
text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text).strip()
|
2026-06-03 20:47:46 +08:00
|
|
|
if limit and len(text) > limit:
|
2026-06-02 21:11:57 +08:00
|
|
|
text = text[:limit].rsplit(" ", 1)[0] + " …"
|
|
|
|
|
return text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _entry_uid(entry) -> str:
|
|
|
|
|
for key in ("id", "guid", "link"):
|
|
|
|
|
value = entry.get(key)
|
|
|
|
|
if value:
|
|
|
|
|
return str(value)
|
|
|
|
|
return f"{entry.get('title', '')}|{entry.get('published', '')}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_image(entry) -> str:
|
|
|
|
|
"""Best-effort: find an image URL in media tags, enclosures or HTML."""
|
|
|
|
|
media = entry.get("media_content") or entry.get("media_thumbnail")
|
|
|
|
|
if media and isinstance(media, list):
|
|
|
|
|
url = media[0].get("url")
|
|
|
|
|
if url:
|
|
|
|
|
return url
|
|
|
|
|
for link in entry.get("links", []):
|
|
|
|
|
if link.get("rel") == "enclosure" and str(link.get("type", "")).startswith("image"):
|
|
|
|
|
return link.get("href", "")
|
|
|
|
|
html = entry.get("summary") or entry.get("description") or ""
|
|
|
|
|
if not html:
|
|
|
|
|
content = entry.get("content")
|
|
|
|
|
if content and isinstance(content, list):
|
|
|
|
|
html = content[0].get("value", "")
|
|
|
|
|
match = _IMG_RE.search(html or "")
|
|
|
|
|
return match.group(1) if match else ""
|
|
|
|
|
|
|
|
|
|
|
2026-06-03 20:47:46 +08:00
|
|
|
def _extract_all_images(entry) -> list[str]:
|
|
|
|
|
"""Extract ALL image URLs from a feed entry (deduplicated, order preserved)."""
|
|
|
|
|
urls: list[str] = []
|
|
|
|
|
|
|
|
|
|
# media_content / media_thumbnail
|
|
|
|
|
for key in ("media_content", "media_thumbnail"):
|
|
|
|
|
media = entry.get(key)
|
|
|
|
|
if media and isinstance(media, list):
|
|
|
|
|
for item in media:
|
|
|
|
|
url = item.get("url")
|
|
|
|
|
if url:
|
|
|
|
|
urls.append(url)
|
|
|
|
|
|
|
|
|
|
# enclosure links with image/ type
|
|
|
|
|
for link in entry.get("links", []):
|
|
|
|
|
if link.get("rel") == "enclosure" and str(link.get("type", "")).startswith("image"):
|
|
|
|
|
href = link.get("href", "")
|
|
|
|
|
if href:
|
|
|
|
|
urls.append(href)
|
|
|
|
|
|
|
|
|
|
# <img> tags in HTML body
|
|
|
|
|
html = entry.get("summary") or entry.get("description") or ""
|
|
|
|
|
if not html:
|
|
|
|
|
content = entry.get("content")
|
|
|
|
|
if content and isinstance(content, list):
|
|
|
|
|
html = content[0].get("value", "")
|
|
|
|
|
urls.extend(_IMG_RE.findall(html or ""))
|
|
|
|
|
|
|
|
|
|
# deduplicate preserving order
|
|
|
|
|
return list(dict.fromkeys(urls))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_videos(entry) -> list[str]:
|
|
|
|
|
"""Extract video/multimedia URLs from a feed entry."""
|
|
|
|
|
urls: list[str] = []
|
|
|
|
|
|
|
|
|
|
# enclosure links with video/ type
|
|
|
|
|
for link in entry.get("links", []):
|
|
|
|
|
if link.get("rel") == "enclosure" and str(link.get("type", "")).startswith("video"):
|
|
|
|
|
href = link.get("href", "")
|
|
|
|
|
if href:
|
|
|
|
|
urls.append(href)
|
|
|
|
|
|
|
|
|
|
# <video>, <iframe>, <source>, <embed> tags in HTML body
|
|
|
|
|
html = entry.get("summary") or entry.get("description") or ""
|
|
|
|
|
if not html:
|
|
|
|
|
content = entry.get("content")
|
|
|
|
|
if content and isinstance(content, list):
|
|
|
|
|
html = content[0].get("value", "")
|
|
|
|
|
urls.extend(_VIDEO_RE.findall(html or ""))
|
|
|
|
|
|
|
|
|
|
return list(dict.fromkeys(urls))
|
|
|
|
|
|
|
|
|
|
|
2026-06-02 21:11:57 +08:00
|
|
|
def _passes_filters(feed: Feed, title: str, body: str) -> bool:
|
|
|
|
|
"""Keyword include/exclude check (case-insensitive)."""
|
|
|
|
|
haystack = f"{title}\n{body}".lower()
|
|
|
|
|
includes = [k.strip().lower() for k in feed.filter_include.split(",") if k.strip()]
|
|
|
|
|
excludes = [k.strip().lower() for k in feed.filter_exclude.split(",") if k.strip()]
|
|
|
|
|
if includes and not any(k in haystack for k in includes):
|
|
|
|
|
return False
|
|
|
|
|
if excludes and any(k in haystack for k in excludes):
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
2026-06-03 20:47:46 +08:00
|
|
|
def _parse_raw(xml: str):
|
|
|
|
|
"""Blocking feedparser call on XML string (run in a thread)."""
|
|
|
|
|
return feedparser.parse(StringIO(xml), agent="rss-ntfy/1.0 (+https://github.com)")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _fetch_feed(url: str, proxy: str = "") -> str:
|
|
|
|
|
"""Download feed XML via httpx (supports proxy)."""
|
|
|
|
|
kw = {"timeout": 30}
|
|
|
|
|
if proxy.strip():
|
|
|
|
|
kw["proxy"] = proxy.strip()
|
|
|
|
|
async with httpx.AsyncClient(**kw) as client:
|
|
|
|
|
resp = await client.get(
|
|
|
|
|
url,
|
|
|
|
|
headers={"User-Agent": "rss-ntfy/1.0 (+https://github.com)"},
|
|
|
|
|
)
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
return resp.text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_full_article(url: str) -> tuple[str, str]:
|
|
|
|
|
"""Fetch page and extract main article text via trafilatura.
|
|
|
|
|
Returns (plain_text, html) or ("", "") on failure.
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
import trafilatura
|
|
|
|
|
downloaded = trafilatura.fetch_url(url)
|
|
|
|
|
if downloaded is None:
|
|
|
|
|
return "", ""
|
|
|
|
|
plain = trafilatura.extract(
|
|
|
|
|
downloaded, output_format="txt", with_metadata=False
|
|
|
|
|
) or ""
|
|
|
|
|
html = trafilatura.extract(
|
|
|
|
|
downloaded, output_format="xml", with_metadata=False
|
|
|
|
|
) or ""
|
|
|
|
|
return plain.strip(), html.strip()
|
|
|
|
|
except Exception:
|
|
|
|
|
return "", ""
|
2026-06-02 21:11:57 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_preview(url: str, include: str = "", exclude: str = "") -> dict:
|
|
|
|
|
"""Fetch a feed and return the newest entry passing filters, for previewing.
|
|
|
|
|
|
|
|
|
|
Raises ValueError if the feed can't be parsed or has no matching entries.
|
|
|
|
|
"""
|
2026-06-03 20:47:46 +08:00
|
|
|
with Session(engine) as s:
|
|
|
|
|
proxy = get_settings(s).proxy_url
|
|
|
|
|
raw_xml = await _fetch_feed(url, proxy=proxy)
|
|
|
|
|
parsed = await asyncio.to_thread(_parse_raw, raw_xml)
|
2026-06-02 21:11:57 +08:00
|
|
|
if getattr(parsed, "bozo", False) and not parsed.entries:
|
|
|
|
|
raise ValueError(str(getattr(parsed, "bozo_exception", "parse error")))
|
|
|
|
|
if not parsed.entries:
|
|
|
|
|
raise ValueError("no entries")
|
|
|
|
|
|
|
|
|
|
probe = Feed(url=url, filter_include=include, filter_exclude=exclude)
|
|
|
|
|
feed_title = parsed.feed.get("title", "") if parsed.feed else ""
|
|
|
|
|
for entry in parsed.entries:
|
|
|
|
|
title = entry.get("title", "")
|
|
|
|
|
body = _strip_html(entry.get("summary") or entry.get("description") or "")
|
|
|
|
|
if not _passes_filters(probe, title, body):
|
|
|
|
|
continue
|
|
|
|
|
return {
|
|
|
|
|
"source": feed_title,
|
|
|
|
|
"title": title or "(no title)",
|
|
|
|
|
"body": body,
|
|
|
|
|
"image": _extract_image(entry),
|
|
|
|
|
"link": entry.get("link", ""),
|
|
|
|
|
}
|
|
|
|
|
raise ValueError("no entries match the filters")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def check_feed(feed: Feed) -> str:
|
|
|
|
|
"""Check a single feed, dispatch new entries, log history. Returns status."""
|
2026-06-03 20:47:46 +08:00
|
|
|
# Load settings early for proxy URL
|
|
|
|
|
with Session(engine) as _sess:
|
|
|
|
|
_settings = get_settings(_sess)
|
|
|
|
|
proxy_url = _settings.proxy_url
|
|
|
|
|
|
|
|
|
|
raw_xml = await _fetch_feed(feed.url, proxy=proxy_url)
|
|
|
|
|
parsed = await asyncio.to_thread(_parse_raw, raw_xml)
|
2026-06-02 21:11:57 +08:00
|
|
|
|
|
|
|
|
if getattr(parsed, "bozo", False) and not parsed.entries:
|
|
|
|
|
exc = getattr(parsed, "bozo_exception", "parse error")
|
|
|
|
|
status = f"parse_error:{exc}"
|
|
|
|
|
await _record_failure(feed.id, status)
|
|
|
|
|
return status
|
|
|
|
|
|
|
|
|
|
feed_title = parsed.feed.get("title", "") if parsed.feed else ""
|
|
|
|
|
|
|
|
|
|
with Session(engine) as session:
|
|
|
|
|
settings = get_settings(session)
|
|
|
|
|
db_feed = session.get(Feed, feed.id)
|
|
|
|
|
if db_feed is None:
|
|
|
|
|
return "Лента удалена"
|
|
|
|
|
|
|
|
|
|
if feed_title and not db_feed.title:
|
|
|
|
|
db_feed.title = feed_title
|
|
|
|
|
|
|
|
|
|
seen_uids = set(
|
|
|
|
|
session.exec(
|
|
|
|
|
select(SeenEntry.entry_uid).where(SeenEntry.feed_id == feed.id)
|
|
|
|
|
).all()
|
|
|
|
|
)
|
|
|
|
|
first_run = len(seen_uids) == 0
|
|
|
|
|
|
|
|
|
|
sent = 0
|
|
|
|
|
skipped = 0
|
|
|
|
|
# Oldest first so notifications arrive in chronological order.
|
|
|
|
|
for entry in reversed(parsed.entries):
|
|
|
|
|
uid = _entry_uid(entry)
|
|
|
|
|
if uid in seen_uids:
|
|
|
|
|
continue
|
|
|
|
|
seen_uids.add(uid)
|
|
|
|
|
session.add(SeenEntry(feed_id=feed.id, entry_uid=uid))
|
|
|
|
|
|
|
|
|
|
# On the very first check we only record state, never spam history.
|
|
|
|
|
if first_run:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
title = entry.get("title", "(без заголовка)")
|
2026-06-03 20:47:46 +08:00
|
|
|
raw_html = entry.get("summary") or entry.get("description") or ""
|
|
|
|
|
link = entry.get("link", "")
|
|
|
|
|
full = db_feed.send_full_content
|
|
|
|
|
fetch_full = db_feed.fetch_full_article
|
|
|
|
|
body = _strip_html(raw_html, limit=0 if full else 1500)
|
|
|
|
|
|
|
|
|
|
# Trafilatura: extract full article text from the link page
|
|
|
|
|
if fetch_full and link and len(body) < 500:
|
|
|
|
|
try:
|
|
|
|
|
extra_text, extra_html = await asyncio.to_thread(
|
|
|
|
|
_extract_full_article, link
|
|
|
|
|
)
|
|
|
|
|
if extra_text:
|
|
|
|
|
body = extra_text
|
|
|
|
|
if extra_html:
|
|
|
|
|
raw_html = extra_html
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # siliently fall back to RSS body
|
2026-06-02 21:11:57 +08:00
|
|
|
|
|
|
|
|
if not _passes_filters(db_feed, title, body):
|
|
|
|
|
skipped += 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
msg = Message(
|
|
|
|
|
source=db_feed.title or feed_title,
|
|
|
|
|
title=title,
|
|
|
|
|
body=body,
|
2026-06-03 20:47:46 +08:00
|
|
|
link=link,
|
2026-06-02 21:11:57 +08:00
|
|
|
image=_extract_image(entry),
|
2026-06-03 20:47:46 +08:00
|
|
|
images=_extract_all_images(entry) if full else [],
|
|
|
|
|
full_html=raw_html if full else "",
|
|
|
|
|
videos=_extract_videos(entry) if full else [],
|
|
|
|
|
full_content=full,
|
2026-06-02 21:11:57 +08:00
|
|
|
)
|
2026-06-03 20:47:46 +08:00
|
|
|
|
|
|
|
|
# Store article for RSS reader (always, including first_run entries)
|
|
|
|
|
try:
|
|
|
|
|
existing_art = session.exec(
|
|
|
|
|
select(Article).where(
|
|
|
|
|
Article.feed_id == db_feed.id,
|
|
|
|
|
Article.link == link,
|
|
|
|
|
)
|
|
|
|
|
).first()
|
|
|
|
|
pub = entry.get("published_parsed")
|
|
|
|
|
pub_dt = None
|
|
|
|
|
if pub:
|
|
|
|
|
try:
|
|
|
|
|
pub_dt = datetime(*pub[:6], tzinfo=timezone.utc)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
if existing_art:
|
|
|
|
|
existing_art.title = title
|
|
|
|
|
existing_art.body = body
|
|
|
|
|
existing_art.full_html = raw_html
|
|
|
|
|
existing_art.image = msg.image
|
|
|
|
|
if pub_dt:
|
|
|
|
|
existing_art.published_at = pub_dt
|
|
|
|
|
session.add(existing_art)
|
|
|
|
|
else:
|
|
|
|
|
session.add(Article(
|
|
|
|
|
feed_id=db_feed.id,
|
|
|
|
|
feed_title=db_feed.title or feed_title,
|
|
|
|
|
title=title,
|
|
|
|
|
body=body,
|
|
|
|
|
full_html=raw_html,
|
|
|
|
|
link=link,
|
|
|
|
|
image=msg.image,
|
|
|
|
|
published_at=pub_dt,
|
|
|
|
|
))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # article storage is best-effort
|
|
|
|
|
|
|
|
|
|
# Digest mode: store instead of dispatching
|
|
|
|
|
if db_feed.digest_enabled:
|
|
|
|
|
session.add(DigestEntry(
|
|
|
|
|
feed_id=db_feed.id,
|
|
|
|
|
title=title,
|
|
|
|
|
link=link,
|
|
|
|
|
body=body,
|
|
|
|
|
image=msg.image,
|
|
|
|
|
full_html=raw_html if full else "",
|
|
|
|
|
images=json.dumps(msg.images) if full else "[]",
|
|
|
|
|
videos=json.dumps(msg.videos) if full else "[]",
|
|
|
|
|
full_content=full,
|
|
|
|
|
))
|
|
|
|
|
# Record as seen but skip dispatch
|
|
|
|
|
sent += 1
|
|
|
|
|
session.add(
|
|
|
|
|
Notification(
|
|
|
|
|
feed_id=db_feed.id,
|
|
|
|
|
feed_title=msg.source,
|
|
|
|
|
title=title,
|
|
|
|
|
link=msg.link,
|
|
|
|
|
channels="digest",
|
|
|
|
|
ok=True,
|
|
|
|
|
detail="queued for digest",
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
continue
|
2026-06-02 21:11:57 +08:00
|
|
|
result = await delivery.dispatch(db_feed, settings, msg)
|
|
|
|
|
|
|
|
|
|
session.add(
|
|
|
|
|
Notification(
|
|
|
|
|
feed_id=db_feed.id,
|
|
|
|
|
feed_title=msg.source,
|
|
|
|
|
title=title,
|
|
|
|
|
link=msg.link,
|
|
|
|
|
channels=",".join(result.channels),
|
|
|
|
|
ok=result.ok,
|
|
|
|
|
detail=result.detail,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if result.ok:
|
|
|
|
|
sent += 1
|
|
|
|
|
elif not result.channels:
|
|
|
|
|
# Hard failure (e.g. ntfy unreachable) — surface it and stop.
|
|
|
|
|
db_feed.last_checked = datetime.now(timezone.utc)
|
|
|
|
|
db_feed.last_status = f"send_error:{result.detail}"
|
|
|
|
|
db_feed.error_streak += 1
|
|
|
|
|
session.commit()
|
|
|
|
|
await _maybe_alert(db_feed.id)
|
|
|
|
|
return db_feed.last_status
|
|
|
|
|
|
|
|
|
|
db_feed.last_checked = datetime.now(timezone.utc)
|
|
|
|
|
db_feed.error_streak = 0
|
|
|
|
|
if first_run:
|
|
|
|
|
db_feed.last_status = f"init:{len(seen_uids)}"
|
|
|
|
|
elif sent:
|
|
|
|
|
db_feed.last_status = f"sent:{sent}:{skipped}" if skipped else f"sent:{sent}"
|
|
|
|
|
elif skipped:
|
|
|
|
|
db_feed.last_status = f"filtered:{skipped}"
|
|
|
|
|
else:
|
|
|
|
|
db_feed.last_status = "nochange"
|
|
|
|
|
session.commit()
|
|
|
|
|
return db_feed.last_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _record_failure(feed_id: int, status: str) -> None:
|
|
|
|
|
with Session(engine) as session:
|
|
|
|
|
db_feed = session.get(Feed, feed_id)
|
|
|
|
|
if db_feed is None:
|
|
|
|
|
return
|
|
|
|
|
db_feed.last_checked = datetime.now(timezone.utc)
|
|
|
|
|
db_feed.last_status = status
|
|
|
|
|
db_feed.error_streak += 1
|
|
|
|
|
session.commit()
|
|
|
|
|
await _maybe_alert(feed_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _maybe_alert(feed_id: int) -> None:
|
|
|
|
|
"""Send an admin alert if a feed has failed too many times in a row."""
|
|
|
|
|
with Session(engine) as session:
|
|
|
|
|
settings = get_settings(session)
|
|
|
|
|
db_feed = session.get(Feed, feed_id)
|
|
|
|
|
if db_feed is None or not settings.alerts_enabled:
|
|
|
|
|
return
|
|
|
|
|
# Alert once, exactly when the streak crosses the threshold.
|
|
|
|
|
if db_feed.error_streak == settings.alert_threshold:
|
|
|
|
|
text = (
|
|
|
|
|
f"Feed \"{db_feed.title or db_feed.url}\" is failing "
|
|
|
|
|
f"({db_feed.error_streak} consecutive errors)."
|
|
|
|
|
)
|
|
|
|
|
await delivery.send_admin_alert(settings, text)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def check_all_feeds() -> None:
|
|
|
|
|
"""Check feeds whose per-feed interval has elapsed (1-minute tick)."""
|
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
|
with Session(engine) as session:
|
|
|
|
|
settings = get_settings(session)
|
|
|
|
|
feeds = session.exec(select(Feed).where(Feed.enabled == True)).all() # noqa: E712
|
|
|
|
|
default_interval = settings.check_interval
|
|
|
|
|
|
|
|
|
|
due: list[Feed] = []
|
|
|
|
|
for feed in feeds:
|
|
|
|
|
interval = feed.interval if feed.interval and feed.interval > 0 else default_interval
|
|
|
|
|
if feed.last_checked is None:
|
|
|
|
|
due.append(feed)
|
|
|
|
|
continue
|
|
|
|
|
last = feed.last_checked
|
|
|
|
|
if last.tzinfo is None:
|
|
|
|
|
last = last.replace(tzinfo=timezone.utc)
|
|
|
|
|
if (now - last).total_seconds() >= interval * 60:
|
|
|
|
|
due.append(feed)
|
|
|
|
|
|
|
|
|
|
if not due:
|
|
|
|
|
return
|
|
|
|
|
log.info("Проверка %d из %d лент", len(due), len(feeds))
|
|
|
|
|
for feed in due:
|
|
|
|
|
try:
|
|
|
|
|
await check_feed(feed)
|
|
|
|
|
except Exception as exc: # noqa: BLE001
|
|
|
|
|
log.exception("Ошибка проверки ленты %s: %s", feed.url, exc)
|