feat: improve scheduler UI and align fetch to half-hours with tests

Made-with: Cursor
This commit is contained in:
cottongin
2026-04-06 21:42:49 -04:00
parent 2cf44fe642
commit 767285119b
3 changed files with 60 additions and 9 deletions

View File

@@ -22,9 +22,17 @@ class SchedulerManager:
interval = Setting.get( interval = Setting.get(
"fetch_interval_hours", default=config.FETCH_INTERVAL_HOURS "fetch_interval_hours", default=config.FETCH_INTERVAL_HOURS
) )
# Calculate the next half-hour mark (XX:00 or XX:30)
now = datetime.now()
if now.minute < 30:
next_run = now.replace(minute=30, second=0, microsecond=0)
else:
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
self.scheduler.add_job( self.scheduler.add_job(
self._run_fetch, self._run_fetch,
IntervalTrigger(hours=interval), IntervalTrigger(hours=interval, start_date=next_run),
id="rss_fetch", id="rss_fetch",
replace_existing=True, replace_existing=True,
) )
@@ -153,14 +161,14 @@ class SchedulerManager:
def get_status(self) -> dict: def get_status(self) -> dict:
status = {"running": self.scheduler.running} status = {"running": self.scheduler.running}
fetch_job = self.scheduler.get_job("rss_fetch") fetch_job = self.scheduler.get_job("rss_fetch")
if fetch_job: if fetch_job and fetch_job.next_run_time:
status["rss_fetch"] = { status["rss_fetch"] = {
"next_run": str(fetch_job.next_run_time), "next_run": fetch_job.next_run_time.strftime("%b %d, %Y %I:%M %p"),
"interval_hours": fetch_job.trigger.interval.total_seconds() / 3600, "interval_hours": fetch_job.trigger.interval.total_seconds() / 3600,
} }
pub_job = self.scheduler.get_job("auto_publish") pub_job = self.scheduler.get_job("auto_publish")
if pub_job: if pub_job and pub_job.next_run_time:
status["auto_publish"] = { status["auto_publish"] = {
"next_run": str(pub_job.next_run_time), "next_run": pub_job.next_run_time.strftime("%b %d, %Y %I:%M %p"),
} }
return status return status

View File

@@ -21,10 +21,10 @@
<hgroup> <hgroup>
<h3>Scheduler</h3> <h3>Scheduler</h3>
<p> <p>
Status: <strong>{{ "Running" if scheduler_status.running else "Stopped" }}</strong> Status: <strong>{{ "Enabled" if scheduler_status.running else "Disabled" }}</strong>
{% if scheduler_status.rss_fetch %} {% if scheduler_status.rss_fetch %}
· Next fetch: {{ scheduler_status.rss_fetch.next_run }} &middot; Next fetch: {{ scheduler_status.rss_fetch.next_run }}
· Interval: {{ scheduler_status.rss_fetch.interval_hours }}h &middot; Interval: {{ scheduler_status.rss_fetch.interval_hours }}h
{% endif %} {% endif %}
</p> </p>
</hgroup> </hgroup>

View File

@@ -1,6 +1,7 @@
import json import json
import os import os
from datetime import date, datetime from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from PIL import Image as PILImage from PIL import Image as PILImage
@@ -63,6 +64,48 @@ def test_scheduler_get_status(app):
mgr.shutdown() mgr.shutdown()
def test_scheduler_starts_on_half_hour(app):
with app.app_context():
mgr = SchedulerManager(app)
# Test when current time is before :30
with patch('src.scheduler.datetime') as mock_dt:
# Use a date far in the future so APScheduler doesn't fast-forward it
mock_dt.now.return_value = datetime(2030, 4, 6, 10, 15, 0)
mgr.start()
job = mgr.scheduler.get_job("rss_fetch")
assert job.next_run_time.minute == 30
assert job.next_run_time.hour == 10
mgr.shutdown()
mgr = SchedulerManager(app)
# Test when current time is after :30
with patch('src.scheduler.datetime') as mock_dt:
mock_dt.now.return_value = datetime(2030, 4, 6, 10, 45, 0)
mgr.start()
job = mgr.scheduler.get_job("rss_fetch")
assert job.next_run_time.minute == 0
assert job.next_run_time.hour == 11
mgr.shutdown()
def test_scheduler_get_status_formatting(app):
with app.app_context():
mgr = SchedulerManager(app)
with patch('src.scheduler.datetime') as mock_dt:
# Force it to start at exactly 10:30 in the future
mock_dt.now.return_value = datetime(2030, 4, 6, 10, 15, 0)
mgr.start()
# The job's next_run_time will be timezone aware in APScheduler
status = mgr.get_status()
assert "rss_fetch" in status
# Should look like "Apr 06, 2030 10:30 AM"
assert "Apr 06, 2030 10:30 AM" in status["rss_fetch"]["next_run"]
mgr.shutdown()
def test_auto_publish_passes_ordered_image_paths_to_generate_cover(app, tmp_path, db): def test_auto_publish_passes_ordered_image_paths_to_generate_cover(app, tmp_path, db):
"""First image per article in pub_date order passed to generate_cover.""" """First image per article in pub_date order passed to generate_cover."""
os.makedirs(tmp_path, exist_ok=True) os.makedirs(tmp_path, exist_ok=True)