feat: APScheduler manager with fetch interval and auto-publish

Made-with: Cursor
This commit is contained in:
cottongin
2026-04-06 15:18:37 -04:00
parent 01f8366e85
commit 50ff2e1533
2 changed files with 209 additions and 0 deletions

154
src/scheduler.py Normal file
View File

@@ -0,0 +1,154 @@
import json
import logging
from datetime import date, datetime, time, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import config
from app import db
from src.models import Article, Issue, Setting
logger = logging.getLogger(__name__)
class SchedulerManager:
def __init__(self, app):
self.app = app
self.scheduler = BackgroundScheduler()
def start(self):
interval = Setting.get(
"fetch_interval_hours", default=config.FETCH_INTERVAL_HOURS
)
self.scheduler.add_job(
self._run_fetch,
IntervalTrigger(hours=interval),
id="rss_fetch",
replace_existing=True,
)
auto_pub = Setting.get("auto_publish", default=None)
if auto_pub:
self.enable_auto_publish(
day_of_week=auto_pub["day_of_week"],
hour=auto_pub["hour"],
minute=auto_pub["minute"],
cover_method=auto_pub["cover_method"],
)
self.scheduler.start()
logger.info("Scheduler started")
def shutdown(self):
if self.scheduler.running:
self.scheduler.shutdown(wait=False)
def _run_fetch(self):
with self.app.app_context():
from src.fetcher import fetch_and_cache_articles
result = fetch_and_cache_articles()
logger.info("Fetch completed: %s", result)
def _run_auto_publish(self):
with self.app.app_context():
from src.cover import generate_cover
from src.epub_builder import build_epub
today = date.today()
week_start = today - timedelta(days=today.weekday())
week_end = week_start + timedelta(days=6)
week_after = week_end + timedelta(days=1)
articles = (
Article.query.filter(
Article.pub_date >= datetime.combine(week_start, time.min)
)
.filter(Article.pub_date < datetime.combine(week_after, time.min))
.order_by(Article.pub_date.asc())
.all()
)
if not articles:
logger.info("No articles for auto-publish, skipping")
return
article_ids = [a.id for a in articles]
headlines = [a.title for a in articles]
auto_pub = Setting.get("auto_publish", {})
method = auto_pub.get("cover_method", "text")
cover_path = generate_cover(
method, config.ISSUES_DIR, week_start, week_end, headlines
)
epub_path = build_epub(
week_start, week_end, article_ids, cover_path, config.ISSUES_DIR
)
issue = Issue(
week_start=week_start,
week_end=week_end,
cover_method=method,
cover_path=cover_path,
epub_path=epub_path,
article_ids=json.dumps(article_ids),
excluded_article_ids=json.dumps([]),
status="published",
)
db.session.add(issue)
db.session.commit()
logger.info("Auto-published issue: %s", epub_path)
def update_fetch_interval(self, hours: int):
Setting.set("fetch_interval_hours", hours)
self.scheduler.reschedule_job(
"rss_fetch", trigger=IntervalTrigger(hours=hours)
)
def enable_auto_publish(
self,
day_of_week: str,
hour: int,
minute: int,
cover_method: str,
):
Setting.set(
"auto_publish",
{
"day_of_week": day_of_week,
"hour": hour,
"minute": minute,
"cover_method": cover_method,
},
)
self.scheduler.add_job(
self._run_auto_publish,
CronTrigger(day_of_week=day_of_week, hour=hour, minute=minute),
id="auto_publish",
replace_existing=True,
)
def disable_auto_publish(self):
Setting.set("auto_publish", None)
try:
self.scheduler.remove_job("auto_publish")
except Exception:
pass
def get_status(self) -> dict:
status = {"running": self.scheduler.running}
fetch_job = self.scheduler.get_job("rss_fetch")
if fetch_job:
status["rss_fetch"] = {
"next_run": str(fetch_job.next_run_time),
"interval_hours": fetch_job.trigger.interval.total_seconds() / 3600,
}
pub_job = self.scheduler.get_job("auto_publish")
if pub_job:
status["auto_publish"] = {
"next_run": str(pub_job.next_run_time),
}
return status

55
tests/test_scheduler.py Normal file
View File

@@ -0,0 +1,55 @@
from src.scheduler import SchedulerManager
def test_scheduler_starts_fetch_job(app):
with app.app_context():
mgr = SchedulerManager(app)
mgr.start()
jobs = mgr.scheduler.get_jobs()
job_ids = [j.id for j in jobs]
assert "rss_fetch" in job_ids
mgr.shutdown()
def test_scheduler_update_fetch_interval(app):
with app.app_context():
mgr = SchedulerManager(app)
mgr.start()
mgr.update_fetch_interval(2)
job = mgr.scheduler.get_job("rss_fetch")
assert job is not None
assert job.trigger.interval.total_seconds() == 7200
mgr.shutdown()
def test_scheduler_enable_auto_publish(app):
with app.app_context():
mgr = SchedulerManager(app)
mgr.start()
mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0,
cover_method="text")
job = mgr.scheduler.get_job("auto_publish")
assert job is not None
mgr.shutdown()
def test_scheduler_disable_auto_publish(app):
with app.app_context():
mgr = SchedulerManager(app)
mgr.start()
mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0,
cover_method="text")
mgr.disable_auto_publish()
job = mgr.scheduler.get_job("auto_publish")
assert job is None
mgr.shutdown()
def test_scheduler_get_status(app):
with app.app_context():
mgr = SchedulerManager(app)
mgr.start()
status = mgr.get_status()
assert status["running"] is True
assert "rss_fetch" in status
mgr.shutdown()