"""APScheduler wrapper that ticks every minute and lets the checker decide which feeds are due (per-feed intervals are evaluated in check_all_feeds).""" from __future__ import annotations import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from datetime import datetime, timezone from sqlmodel import Session, select from . import delivery from .checker import check_all_feeds from .database import engine, get_settings from .delivery import Message from .models import DigestEntry, Feed, Notification log = logging.getLogger("scheduler") _scheduler: AsyncIOScheduler | None = None _JOB_ID = "check-feeds" _JOB_DIGEST = "send-digests" # Fixed tick; per-feed/global intervals are honoured inside check_all_feeds. _TICK_SECONDS = 60 async def send_due_digests() -> None: """Check each digest-enabled feed and send pending entries.""" now = datetime.now(timezone.utc) with Session(engine) as session: settings = get_settings(session) feeds = session.exec( select(Feed).where( Feed.enabled == True, # noqa: E712 Feed.digest_enabled == True, # noqa: E712 ) ).all() for feed in feeds: period_hours = max(1, feed.digest_period_hours) if feed.last_digest_at is not None: last = feed.last_digest_at.replace(tzinfo=timezone.utc) if feed.last_digest_at.tzinfo is None else feed.last_digest_at if (now - last).total_seconds() < period_hours * 3600: continue with Session(engine) as session: entries = session.exec( select(DigestEntry) .where(DigestEntry.feed_id == feed.id) .order_by(DigestEntry.created_at) ).all() if not entries: continue db_feed = session.get(Feed, feed.id) stg = get_settings(session) # Build digest message as Markdown list lines = [f"# πŸ“‘ {db_feed.title or 'ДайдТСст'}\n"] for e in entries: lines.append(f"**[{e.title}]({e.link})**") if e.body: preview = e.body[:200].replace("\n", " ") lines.append(f"> {preview}") lines.append("") digest_msg = Message( source=db_feed.title or "", title=f"ДайдТСст ({len(entries)} записСй)", body="\n".join(lines), link="", image="", full_content=True, ) result = await delivery.dispatch(db_feed, stg, digest_msg) session.add( Notification( feed_id=db_feed.id, feed_title=digest_msg.source, title=digest_msg.title, link="", channels=",".join(result.channels), ok=result.ok, detail=result.detail, ) ) # Clear sent entries for e in entries: session.delete(e) db_feed.last_digest_at = now db_feed.last_checked = now db_feed.last_status = f"digest:{len(entries)}" session.commit() def start(interval_minutes: int) -> None: global _scheduler if _scheduler is not None: return _scheduler = AsyncIOScheduler(timezone="UTC") _scheduler.add_job( check_all_feeds, trigger=IntervalTrigger(seconds=_TICK_SECONDS), id=_JOB_ID, max_instances=1, coalesce=True, replace_existing=True, ) _scheduler.add_job( send_due_digests, trigger=IntervalTrigger(seconds=300), # every 5 minutes id=_JOB_DIGEST, max_instances=1, coalesce=True, ) _scheduler.start() log.info("ΠŸΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊ Π·Π°ΠΏΡƒΡ‰Π΅Π½ (Ρ‚ΠΈΠΊ 60с), ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π» ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ %d ΠΌΠΈΠ½", interval_minutes) def reschedule(interval_minutes: int) -> None: # The global interval is read live by the checker each tick, so there is # nothing to reschedule β€” kept for API compatibility. log.info("Π˜Π½Ρ‚Π΅Ρ€Π²Π°Π» ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ ΠΈΠ·ΠΌΠ΅Π½Ρ‘Π½ Π½Π° %d ΠΌΠΈΠ½", interval_minutes) def shutdown() -> None: global _scheduler if _scheduler is not None: _scheduler.shutdown(wait=False) _scheduler = None