834092a3ec
build-and-push / docker (push) Has been cancelled
- Full article extraction via trafilatura (fetch_full_article)
- Digest mode with configurable period (digest_enabled, digest_period_hours)
- ntfy Actions buttons (Open article, Open feed)
- Notification templates with {title}, {body}, {link}, {source}, {image_url}
- FTS5 full-text search in notification history
- Database backup/restore (download/upload .db)
- HTTP/SOCKS proxy for RSS feed fetching (proxy_url setting)
- Built-in RSS reader tab with categories, unread counts, article detail view
- Auto-category 'Общее' for feeds without a category
- Article storage (Article table) for reader
- DigestEntry model for pending digest entries
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
"""APScheduler wrapper that ticks every minute and lets the checker decide
|
|
which feeds are due (per-feed intervals are evaluated in check_all_feeds)."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlmodel import Session, select
|
|
|
|
from . import delivery
|
|
from .checker import check_all_feeds
|
|
from .database import engine, get_settings
|
|
from .delivery import Message
|
|
from .models import DigestEntry, Feed, Notification
|
|
|
|
log = logging.getLogger("scheduler")
|
|
|
|
_scheduler: AsyncIOScheduler | None = None
|
|
_JOB_ID = "check-feeds"
|
|
_JOB_DIGEST = "send-digests"
|
|
# Fixed tick; per-feed/global intervals are honoured inside check_all_feeds.
|
|
_TICK_SECONDS = 60
|
|
|
|
|
|
async def send_due_digests() -> None:
|
|
"""Check each digest-enabled feed and send pending entries."""
|
|
now = datetime.now(timezone.utc)
|
|
with Session(engine) as session:
|
|
settings = get_settings(session)
|
|
feeds = session.exec(
|
|
select(Feed).where(
|
|
Feed.enabled == True, # noqa: E712
|
|
Feed.digest_enabled == True, # noqa: E712
|
|
)
|
|
).all()
|
|
|
|
for feed in feeds:
|
|
period_hours = max(1, feed.digest_period_hours)
|
|
if feed.last_digest_at is not None:
|
|
last = feed.last_digest_at.replace(tzinfo=timezone.utc) if feed.last_digest_at.tzinfo is None else feed.last_digest_at
|
|
if (now - last).total_seconds() < period_hours * 3600:
|
|
continue
|
|
|
|
with Session(engine) as session:
|
|
entries = session.exec(
|
|
select(DigestEntry)
|
|
.where(DigestEntry.feed_id == feed.id)
|
|
.order_by(DigestEntry.created_at)
|
|
).all()
|
|
if not entries:
|
|
continue
|
|
|
|
db_feed = session.get(Feed, feed.id)
|
|
stg = get_settings(session)
|
|
|
|
# Build digest message as Markdown list
|
|
lines = [f"# 📡 {db_feed.title or 'Дайджест'}\n"]
|
|
for e in entries:
|
|
lines.append(f"**[{e.title}]({e.link})**")
|
|
if e.body:
|
|
preview = e.body[:200].replace("\n", " ")
|
|
lines.append(f"> {preview}")
|
|
lines.append("")
|
|
|
|
digest_msg = Message(
|
|
source=db_feed.title or "",
|
|
title=f"Дайджест ({len(entries)} записей)",
|
|
body="\n".join(lines),
|
|
link="",
|
|
image="",
|
|
full_content=True,
|
|
)
|
|
|
|
result = await delivery.dispatch(db_feed, stg, digest_msg)
|
|
|
|
session.add(
|
|
Notification(
|
|
feed_id=db_feed.id,
|
|
feed_title=digest_msg.source,
|
|
title=digest_msg.title,
|
|
link="",
|
|
channels=",".join(result.channels),
|
|
ok=result.ok,
|
|
detail=result.detail,
|
|
)
|
|
)
|
|
|
|
# Clear sent entries
|
|
for e in entries:
|
|
session.delete(e)
|
|
|
|
db_feed.last_digest_at = now
|
|
db_feed.last_checked = now
|
|
db_feed.last_status = f"digest:{len(entries)}"
|
|
session.commit()
|
|
|
|
|
|
def start(interval_minutes: int) -> None:
|
|
global _scheduler
|
|
if _scheduler is not None:
|
|
return
|
|
_scheduler = AsyncIOScheduler(timezone="UTC")
|
|
_scheduler.add_job(
|
|
check_all_feeds,
|
|
trigger=IntervalTrigger(seconds=_TICK_SECONDS),
|
|
id=_JOB_ID,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
replace_existing=True,
|
|
)
|
|
_scheduler.add_job(
|
|
send_due_digests,
|
|
trigger=IntervalTrigger(seconds=300), # every 5 minutes
|
|
id=_JOB_DIGEST,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
)
|
|
_scheduler.start()
|
|
log.info("Планировщик запущен (тик 60с), интервал по умолчанию %d мин", interval_minutes)
|
|
|
|
|
|
def reschedule(interval_minutes: int) -> None:
|
|
# The global interval is read live by the checker each tick, so there is
|
|
# nothing to reschedule — kept for API compatibility.
|
|
log.info("Интервал по умолчанию изменён на %d мин", interval_minutes)
|
|
|
|
|
|
def shutdown() -> None:
|
|
global _scheduler
|
|
if _scheduler is not None:
|
|
_scheduler.shutdown(wait=False)
|
|
_scheduler = None
|