from src.scheduler import SchedulerManager def test_scheduler_starts_fetch_job(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() jobs = mgr.scheduler.get_jobs() job_ids = [j.id for j in jobs] assert "rss_fetch" in job_ids mgr.shutdown() def test_scheduler_update_fetch_interval(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() mgr.update_fetch_interval(2) job = mgr.scheduler.get_job("rss_fetch") assert job is not None assert job.trigger.interval.total_seconds() == 7200 mgr.shutdown() def test_scheduler_enable_auto_publish(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0, cover_method="text") job = mgr.scheduler.get_job("auto_publish") assert job is not None mgr.shutdown() def test_scheduler_disable_auto_publish(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0, cover_method="text") mgr.disable_auto_publish() job = mgr.scheduler.get_job("auto_publish") assert job is None mgr.shutdown() def test_scheduler_get_status(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() status = mgr.get_status() assert status["running"] is True assert "rss_fetch" in status mgr.shutdown()