56 lines
1.6 KiB
Python
56 lines
1.6 KiB
Python
from src.scheduler import SchedulerManager
|
|
|
|
|
|
def test_scheduler_starts_fetch_job(app):
|
|
with app.app_context():
|
|
mgr = SchedulerManager(app)
|
|
mgr.start()
|
|
jobs = mgr.scheduler.get_jobs()
|
|
job_ids = [j.id for j in jobs]
|
|
assert "rss_fetch" in job_ids
|
|
mgr.shutdown()
|
|
|
|
|
|
def test_scheduler_update_fetch_interval(app):
|
|
with app.app_context():
|
|
mgr = SchedulerManager(app)
|
|
mgr.start()
|
|
mgr.update_fetch_interval(2)
|
|
job = mgr.scheduler.get_job("rss_fetch")
|
|
assert job is not None
|
|
assert job.trigger.interval.total_seconds() == 7200
|
|
mgr.shutdown()
|
|
|
|
|
|
def test_scheduler_enable_auto_publish(app):
|
|
with app.app_context():
|
|
mgr = SchedulerManager(app)
|
|
mgr.start()
|
|
mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0,
|
|
cover_method="text")
|
|
job = mgr.scheduler.get_job("auto_publish")
|
|
assert job is not None
|
|
mgr.shutdown()
|
|
|
|
|
|
def test_scheduler_disable_auto_publish(app):
|
|
with app.app_context():
|
|
mgr = SchedulerManager(app)
|
|
mgr.start()
|
|
mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0,
|
|
cover_method="text")
|
|
mgr.disable_auto_publish()
|
|
job = mgr.scheduler.get_job("auto_publish")
|
|
assert job is None
|
|
mgr.shutdown()
|
|
|
|
|
|
def test_scheduler_get_status(app):
|
|
with app.app_context():
|
|
mgr = SchedulerManager(app)
|
|
mgr.start()
|
|
status = mgr.get_status()
|
|
assert status["running"] is True
|
|
assert "rss_fetch" in status
|
|
mgr.shutdown()
|