"""Background RSS polling and dispatch across channels."""
from __future__ import annotations
import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from html import unescape
import httpx
from io import StringIO
import feedparser
from sqlmodel import Session, select
from . import delivery
from .database import engine, get_settings
from .delivery import Message
from .models import Article, DigestEntry, Feed, Notification, SeenEntry
log = logging.getLogger("checker")
_TAG_RE = re.compile(r"<[^>]+>")
_IMG_RE = re.compile(r'
]+src=["\']([^"\']+)["\']', re.IGNORECASE)
_VIDEO_RE = re.compile(
r'<(?:video|iframe|source|embed)[^>]*src=["\']([^"\']+)["\']', re.IGNORECASE
)
_ENC_VIDEO_RE = re.compile(r'<(?:video|iframe|source|embed)[^>]*>', re.IGNORECASE)
def _strip_html(text: str, limit: int = 1500) -> str:
text = unescape(_TAG_RE.sub(" ", text or ""))
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text).strip()
if limit and len(text) > limit:
text = text[:limit].rsplit(" ", 1)[0] + " …"
return text
def _entry_uid(entry) -> str:
for key in ("id", "guid", "link"):
value = entry.get(key)
if value:
return str(value)
return f"{entry.get('title', '')}|{entry.get('published', '')}"
def _extract_image(entry) -> str:
"""Best-effort: find an image URL in media tags, enclosures or HTML."""
media = entry.get("media_content") or entry.get("media_thumbnail")
if media and isinstance(media, list):
url = media[0].get("url")
if url:
return url
for link in entry.get("links", []):
if link.get("rel") == "enclosure" and str(link.get("type", "")).startswith("image"):
return link.get("href", "")
html = entry.get("summary") or entry.get("description") or ""
if not html:
content = entry.get("content")
if content and isinstance(content, list):
html = content[0].get("value", "")
match = _IMG_RE.search(html or "")
return match.group(1) if match else ""
def _extract_all_images(entry) -> list[str]:
"""Extract ALL image URLs from a feed entry (deduplicated, order preserved)."""
urls: list[str] = []
# media_content / media_thumbnail
for key in ("media_content", "media_thumbnail"):
media = entry.get(key)
if media and isinstance(media, list):
for item in media:
url = item.get("url")
if url:
urls.append(url)
# enclosure links with image/ type
for link in entry.get("links", []):
if link.get("rel") == "enclosure" and str(link.get("type", "")).startswith("image"):
href = link.get("href", "")
if href:
urls.append(href)
#
tags in HTML body
html = entry.get("summary") or entry.get("description") or ""
if not html:
content = entry.get("content")
if content and isinstance(content, list):
html = content[0].get("value", "")
urls.extend(_IMG_RE.findall(html or ""))
# deduplicate preserving order
return list(dict.fromkeys(urls))
def _extract_videos(entry) -> list[str]:
"""Extract video/multimedia URLs from a feed entry."""
urls: list[str] = []
# enclosure links with video/ type
for link in entry.get("links", []):
if link.get("rel") == "enclosure" and str(link.get("type", "")).startswith("video"):
href = link.get("href", "")
if href:
urls.append(href)
#