feat: add historical backfill with --init CLI and episode numbering

Adds a --init mode that seeds the database with past shows from a given
anchor episode/date forward, batch-fetching likes from SoundCloud and
partitioning them into weekly buckets. Episode numbers are tracked in
the shows table and auto-incremented by the poller for new shows.

Includes full API documentation (docs/api.md) and updated README.

Made-with: Cursor
This commit is contained in:
cottongin
2026-03-12 02:09:15 -04:00
parent c88826ac4d
commit cb3ae403cf
14 changed files with 922 additions and 21 deletions

View File

@@ -61,6 +61,7 @@ def create_app(
tracks = db.get_show_tracks(show.id)
return {
"show_id": show.id,
"episode_number": show.episode_number,
"week_start": show.week_start.isoformat(),
"week_end": show.week_end.isoformat(),
"tracks": tracks,
@@ -80,6 +81,7 @@ def create_app(
return [
{
"id": s.id,
"episode_number": s.episode_number,
"week_start": s.week_start.isoformat(),
"week_end": s.week_end.isoformat(),
"created_at": s.created_at.isoformat(),
@@ -96,6 +98,7 @@ def create_app(
tracks = db.get_show_tracks(show.id)
return {
"show_id": show.id,
"episode_number": show.episode_number,
"week_start": show.week_start.isoformat(),
"week_end": show.week_end.isoformat(),
"tracks": tracks,

View File

@@ -0,0 +1,82 @@
import logging
from datetime import date, datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from ntr_fetcher.db import Database
from ntr_fetcher.soundcloud import SoundCloudClient
from ntr_fetcher.week import get_show_week
EASTERN = ZoneInfo("America/New_York")
logger = logging.getLogger(__name__)
def _compute_show_weeks(
anchor_aired: date,
anchor_episode: int,
show_day: int,
show_hour: int,
) -> list[tuple[int, datetime, datetime]]:
"""Return (episode_number, week_start_utc, week_end_utc) for each show
from the anchor forward through the current date."""
today = date.today()
weeks: list[tuple[int, datetime, datetime]] = []
aired = anchor_aired
episode = anchor_episode
while aired <= today:
noon_et = datetime(aired.year, aired.month, aired.day, 12, 0, 0, tzinfo=EASTERN)
noon_utc = noon_et.astimezone(timezone.utc).replace(tzinfo=timezone.utc)
week_start, week_end = get_show_week(noon_utc, show_day, show_hour)
weeks.append((episode, week_start, week_end))
aired += timedelta(days=7)
episode += 1
return weeks
async def run_backfill(
db: Database,
soundcloud: SoundCloudClient,
soundcloud_user: str,
show_day: int,
show_hour: int,
anchor_episode: int,
anchor_aired: date,
) -> None:
weeks = _compute_show_weeks(anchor_aired, anchor_episode, show_day, show_hour)
if not weeks:
logger.warning("No show weeks to backfill")
return
logger.info(
"Backfilling %d shows: #%d (%s) through #%d (%s)",
len(weeks),
weeks[0][0], anchor_aired.isoformat(),
weeks[-1][0], (anchor_aired + timedelta(days=7 * (len(weeks) - 1))).isoformat(),
)
overall_start = weeks[0][1]
overall_end = weeks[-1][2]
user_id = await soundcloud.resolve_user(soundcloud_user)
all_tracks = await soundcloud.fetch_likes(
user_id=user_id,
since=overall_start,
until=overall_end,
)
logger.info("Fetched %d total tracks from SoundCloud", len(all_tracks))
for track in all_tracks:
db.upsert_track(track)
for episode, week_start, week_end in weeks:
show = db.get_or_create_show(week_start, week_end, episode_number=episode)
week_tracks = [
t for t in all_tracks
if week_start <= t.liked_at < week_end
]
week_tracks.sort(key=lambda t: t.liked_at)
track_ids = [t.id for t in week_tracks]
db.set_show_tracks(show.id, track_ids)
logger.info("Show #%d (%s): %d tracks", episode, week_start.isoformat(), len(week_tracks))

View File

@@ -20,7 +20,8 @@ CREATE TABLE IF NOT EXISTS shows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
week_start TEXT NOT NULL,
week_end TEXT NOT NULL,
created_at TEXT NOT NULL
created_at TEXT NOT NULL,
episode_number INTEGER
);
CREATE TABLE IF NOT EXISTS show_tracks (
@@ -49,6 +50,11 @@ class Database:
def initialize(self) -> None:
conn = self._connect()
conn.executescript(SCHEMA)
try:
conn.execute("ALTER TABLE shows ADD COLUMN episode_number INTEGER")
conn.commit()
except sqlite3.OperationalError:
pass
conn.close()
def upsert_track(self, track: Track) -> None:
@@ -102,26 +108,36 @@ class Database:
)
def get_or_create_show(
self, week_start: datetime, week_end: datetime
self,
week_start: datetime,
week_end: datetime,
episode_number: int | None = None,
) -> Show:
conn = self._connect()
row = conn.execute(
"SELECT id, week_start, week_end, created_at FROM shows "
"SELECT id, week_start, week_end, created_at, episode_number FROM shows "
"WHERE week_start = ? AND week_end = ?",
(week_start.isoformat(), week_end.isoformat()),
).fetchone()
if row is not None:
if episode_number is not None and row["episode_number"] != episode_number:
conn.execute(
"UPDATE shows SET episode_number = ? WHERE id = ?",
(episode_number, row["id"]),
)
conn.commit()
conn.close()
return Show(
id=row["id"],
week_start=datetime.fromisoformat(row["week_start"]),
week_end=datetime.fromisoformat(row["week_end"]),
created_at=datetime.fromisoformat(row["created_at"]),
episode_number=episode_number if episode_number is not None else row["episode_number"],
)
now = datetime.now(timezone.utc).isoformat()
cursor = conn.execute(
"INSERT INTO shows (week_start, week_end, created_at) VALUES (?, ?, ?)",
(week_start.isoformat(), week_end.isoformat(), now),
"INSERT INTO shows (week_start, week_end, created_at, episode_number) VALUES (?, ?, ?, ?)",
(week_start.isoformat(), week_end.isoformat(), now, episode_number),
)
conn.commit()
show_id = cursor.lastrowid
@@ -131,6 +147,7 @@ class Database:
week_start=week_start,
week_end=week_end,
created_at=datetime.fromisoformat(now),
episode_number=episode_number,
)
def get_show_tracks(self, show_id: int) -> list[dict]:
@@ -203,9 +220,9 @@ class Database:
conn = self._connect()
rows = conn.execute(
"""
SELECT id, week_start, week_end, created_at
SELECT id, week_start, week_end, created_at, episode_number
FROM shows
ORDER BY created_at DESC
ORDER BY week_start DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
@@ -217,10 +234,28 @@ class Database:
week_start=datetime.fromisoformat(row["week_start"]),
week_end=datetime.fromisoformat(row["week_end"]),
created_at=datetime.fromisoformat(row["created_at"]),
episode_number=row["episode_number"],
)
for row in rows
]
def get_latest_episode_number(self) -> int | None:
conn = self._connect()
row = conn.execute(
"SELECT MAX(episode_number) as max_ep FROM shows WHERE episode_number IS NOT NULL"
).fetchone()
conn.close()
return row["max_ep"] if row else None
def update_show_episode_number(self, show_id: int, episode_number: int) -> None:
conn = self._connect()
conn.execute(
"UPDATE shows SET episode_number = ? WHERE id = ?",
(episode_number, show_id),
)
conn.commit()
conn.close()
def has_track_in_show(self, show_id: int, track_id: int) -> bool:
conn = self._connect()
row = conn.execute(

View File

@@ -1,9 +1,12 @@
import argparse
import asyncio
import logging
from datetime import date
import uvicorn
from ntr_fetcher.api import create_app
from ntr_fetcher.backfill import run_backfill
from ntr_fetcher.config import Settings
from ntr_fetcher.db import Database
from ntr_fetcher.poller import Poller
@@ -16,13 +19,51 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="NtR SoundCloud Fetcher")
parser.add_argument(
"--init", action="store_true",
help="Run historical backfill instead of starting the server",
)
parser.add_argument(
"--show", type=int,
help="Anchor episode number (required with --init)",
)
parser.add_argument(
"--aired", type=date.fromisoformat,
help="Air date of anchor episode as YYYY-MM-DD (required with --init)",
)
args = parser.parse_args()
if args.init and (args.show is None or args.aired is None):
parser.error("--init requires both --show and --aired")
return args
def run() -> None:
args = _parse_args()
settings = Settings()
db = Database(settings.db_path)
db.initialize()
logger.info("Database initialized at %s", settings.db_path)
if args.init:
sc = SoundCloudClient()
asyncio.run(
run_backfill(
db=db,
soundcloud=sc,
soundcloud_user=settings.soundcloud_user,
show_day=settings.show_day,
show_hour=settings.show_hour,
anchor_episode=args.show,
anchor_aired=args.aired,
)
)
asyncio.run(sc.close())
logger.info("Backfill complete")
return
sc = SoundCloudClient()
poller = Poller(
db=db,

View File

@@ -21,6 +21,7 @@ class Show:
week_start: datetime
week_end: datetime
created_at: datetime
episode_number: int | None = None
@dataclass(frozen=True)

View File

@@ -40,6 +40,13 @@ class Poller:
week_start, week_end = get_show_week(now, self._show_day, self._show_hour)
show = self._db.get_or_create_show(week_start, week_end)
if show.episode_number is None:
latest = self._db.get_latest_episode_number()
if latest is not None:
new_ep = latest + 1
self._db.update_show_episode_number(show.id, new_ep)
logger.info("Auto-assigned episode #%d to show %d", new_ep, show.id)
tracks = await self._sc.fetch_likes(
user_id=user_id,
since=week_start,