From 7eaf036d4bbfffde5894c66cd022a12fb8356d95 Mon Sep 17 00:00:00 2001 From: cottongin Date: Thu, 12 Mar 2026 01:05:52 -0400 Subject: [PATCH] plan: add implementation plan for NtR SoundCloud fetcher 13-task TDD implementation plan covering project scaffolding, config, week boundaries, database, SoundCloud client, poller, API, and entry point. Made-with: Cursor --- .../2026-03-12-ntr-fetcher-implementation.md | 2017 +++++++++++++++++ 1 file changed, 2017 insertions(+) create mode 100644 docs/plans/2026-03-12-ntr-fetcher-implementation.md diff --git a/docs/plans/2026-03-12-ntr-fetcher-implementation.md b/docs/plans/2026-03-12-ntr-fetcher-implementation.md new file mode 100644 index 0000000..bb8783c --- /dev/null +++ b/docs/plans/2026-03-12-ntr-fetcher-implementation.md @@ -0,0 +1,2017 @@ +# NtR SoundCloud Fetcher Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Build a Python service that polls NicktheRat's SoundCloud likes, builds weekly playlists, and serves them via a JSON API. + +**Architecture:** Single async Python process — FastAPI serves the JSON API, an async background task polls SoundCloud hourly, and a supervisor loop restarts the poller on failure. SQLite stores all state. See `docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md` for the full design. + +**Tech Stack:** Python 3.11+, FastAPI, uvicorn, httpx, pydantic, sqlite3, zoneinfo. + +**Reference docs:** +- Design: `docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md` +- SoundCloud API: `docs/soundcloud-likes-api.md` + +--- + +## Task 1: Project Scaffolding + +**Files:** +- Create: `pyproject.toml` +- Create: `src/ntr_fetcher/__init__.py` +- Create: `tests/__init__.py` +- Create: `tests/conftest.py` + +**Step 1: Create `pyproject.toml`** + +```toml +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ntr-fetcher" +version = "0.1.0" +description = "SoundCloud likes fetcher for Nick the Rat Radio" +requires-python = ">=3.11" +dependencies = [ + "fastapi", + "uvicorn[standard]", + "httpx", + "pydantic-settings", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "pytest-httpx", + "ruff", +] + +[project.scripts] +ntr-fetcher = "ntr_fetcher.main:run" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.ruff] +target-version = "py311" +src = ["src"] +``` + +**Step 2: Create package init** + +```python +# src/ntr_fetcher/__init__.py +``` + +(Empty file — just marks the package.) + +**Step 3: Create test scaffolding** + +```python +# tests/__init__.py +``` + +```python +# tests/conftest.py +import pytest +``` + +**Step 4: Install the project in dev mode** + +Run: `pip install -e ".[dev]"` +Expected: Successfully installed with all dependencies. + +**Step 5: Verify pytest runs** + +Run: `pytest --co` +Expected: "no tests ran" (collected 0 items), exit code 0 or 5 (no tests found). + +**Step 6: Commit** + +```bash +git add pyproject.toml src/ tests/ +git commit -m "scaffold: project structure with pyproject.toml and test config" +``` + +--- + +## Task 2: Configuration Module + +**Files:** +- Create: `src/ntr_fetcher/config.py` +- Create: `tests/test_config.py` + +**Step 1: Write the failing test** + +```python +# tests/test_config.py +import os + +from ntr_fetcher.config import Settings + + +def test_settings_defaults(): + settings = Settings(admin_token="test-secret") + assert settings.port == 8000 + assert settings.host == "127.0.0.1" + assert settings.db_path == "./ntr_fetcher.db" + assert settings.poll_interval_seconds == 3600 + assert settings.soundcloud_user == "nicktherat" + assert settings.show_day == 2 + assert settings.show_hour == 22 + + +def test_settings_from_env(monkeypatch): + monkeypatch.setenv("NTR_PORT", "9090") + monkeypatch.setenv("NTR_HOST", "0.0.0.0") + monkeypatch.setenv("NTR_ADMIN_TOKEN", "my-secret") + monkeypatch.setenv("NTR_SOUNDCLOUD_USER", "someoneelse") + settings = Settings() + assert settings.port == 9090 + assert settings.host == "0.0.0.0" + assert settings.admin_token == "my-secret" + assert settings.soundcloud_user == "someoneelse" + + +def test_settings_admin_token_required(): + import pytest + with pytest.raises(Exception): + Settings() +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_config.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'ntr_fetcher.config'` + +**Step 3: Write the implementation** + +```python +# src/ntr_fetcher/config.py +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + model_config = {"env_prefix": "NTR_"} + + port: int = 8000 + host: str = "127.0.0.1" + db_path: str = "./ntr_fetcher.db" + poll_interval_seconds: int = 3600 + admin_token: str + soundcloud_user: str = "nicktherat" + show_day: int = 2 + show_hour: int = 22 +``` + +**Step 4: Run test to verify it passes** + +Run: `pytest tests/test_config.py -v` +Expected: 3 passed. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/config.py tests/test_config.py +git commit -m "feat: add configuration module with pydantic-settings" +``` + +--- + +## Task 3: Week Boundary Computation + +**Files:** +- Create: `src/ntr_fetcher/week.py` +- Create: `tests/test_week.py` + +This module computes the Wednesday 22:00 Eastern boundaries in UTC. Must handle EST/EDT transitions correctly. + +**Step 1: Write the failing tests** + +```python +# tests/test_week.py +from datetime import datetime, timezone + +from ntr_fetcher.week import get_show_week, Show_DAY_DEFAULT, SHOW_HOUR_DEFAULT + + +def test_mid_week_thursday(): + """Thursday should belong to the show that started the previous Wednesday.""" + # Thursday March 13, 2026 15:00 UTC (11:00 AM EDT) + now = datetime(2026, 3, 13, 15, 0, 0, tzinfo=timezone.utc) + start, end = get_show_week(now, show_day=2, show_hour=22) + # Show started Wed March 12 22:00 EDT = March 13 02:00 UTC (DST active since March 8) + assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + # Show ends next Wed March 19 22:00 EDT = March 20 02:00 UTC + assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + + +def test_wednesday_before_show(): + """Wednesday before 22:00 ET belongs to the previous week's show.""" + # Wednesday March 12, 2026 20:00 UTC (16:00 EDT — before 22:00 EDT) + now = datetime(2026, 3, 12, 20, 0, 0, tzinfo=timezone.utc) + start, end = get_show_week(now, show_day=2, show_hour=22) + # Previous Wed was March 5 22:00 EST = March 6 03:00 UTC (before DST) + assert start == datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc) + # Ends this Wed March 12 22:00 EDT = March 13 02:00 UTC (after DST) + assert end == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + + +def test_wednesday_after_show_starts(): + """Wednesday at or after 22:00 ET belongs to the new week.""" + # Wednesday March 12, 2026 at 22:30 EDT = March 13 02:30 UTC + now = datetime(2026, 3, 13, 2, 30, 0, tzinfo=timezone.utc) + start, end = get_show_week(now, show_day=2, show_hour=22) + assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + + +def test_est_period_no_dst(): + """January — firmly in EST (UTC-5).""" + # Thursday Jan 15, 2026 12:00 UTC + now = datetime(2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + start, end = get_show_week(now, show_day=2, show_hour=22) + # Wed Jan 14 22:00 EST = Jan 15 03:00 UTC + assert start == datetime(2026, 1, 15, 3, 0, 0, tzinfo=timezone.utc) + # Wed Jan 21 22:00 EST = Jan 22 03:00 UTC + assert end == datetime(2026, 1, 22, 3, 0, 0, tzinfo=timezone.utc) +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_week.py -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Write the implementation** + +```python +# src/ntr_fetcher/week.py +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo + +EASTERN = ZoneInfo("America/New_York") +Show_DAY_DEFAULT = 2 # Wednesday (Monday=0) +SHOW_HOUR_DEFAULT = 22 + + +def get_show_week( + now_utc: datetime, + show_day: int = Show_DAY_DEFAULT, + show_hour: int = SHOW_HOUR_DEFAULT, +) -> tuple[datetime, datetime]: + """Return (week_start_utc, week_end_utc) for the show week containing now_utc. + + The week starts at show_day at show_hour Eastern Time and runs for 7 days. + """ + now_et = now_utc.astimezone(EASTERN) + + # Find the most recent show_day at show_hour that is <= now + days_since_show_day = (now_et.weekday() - show_day) % 7 + candidate_date = now_et.date() - timedelta(days=days_since_show_day) + candidate = datetime( + candidate_date.year, + candidate_date.month, + candidate_date.day, + show_hour, + 0, + 0, + tzinfo=EASTERN, + ) + + if candidate > now_et: + # We're on the show day but before the show hour — go back a week + candidate -= timedelta(days=7) + + week_start_utc = candidate.astimezone(timezone.utc).replace(tzinfo=timezone.utc) + week_end_utc = (candidate + timedelta(days=7)).astimezone(timezone.utc).replace(tzinfo=timezone.utc) + + return week_start_utc, week_end_utc +``` + +**Step 4: Run test to verify it passes** + +Run: `pytest tests/test_week.py -v` +Expected: 4 passed. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/week.py tests/test_week.py +git commit -m "feat: add week boundary computation with DST handling" +``` + +--- + +## Task 4: Models + +**Files:** +- Create: `src/ntr_fetcher/models.py` + +Simple dataclasses for passing data between layers. No test needed — these are pure data containers validated by type checkers and used transitively in other tests. + +**Step 1: Write the models** + +```python +# src/ntr_fetcher/models.py +from dataclasses import dataclass +from datetime import datetime + + +@dataclass(frozen=True) +class Track: + id: int + title: str + artist: str + permalink_url: str + artwork_url: str | None + duration_ms: int + license: str + liked_at: datetime + raw_json: str + + +@dataclass(frozen=True) +class Show: + id: int + week_start: datetime + week_end: datetime + created_at: datetime + + +@dataclass(frozen=True) +class ShowTrack: + show_id: int + track_id: int + position: int +``` + +**Step 2: Commit** + +```bash +git add src/ntr_fetcher/models.py +git commit -m "feat: add data models for Track, Show, ShowTrack" +``` + +--- + +## Task 5: Database Module — Schema & Migrations + +**Files:** +- Create: `src/ntr_fetcher/db.py` +- Create: `tests/test_db.py` + +This is a larger module. We'll build it in stages: schema first, then queries. + +**Step 1: Write the failing test for schema creation** + +```python +# tests/test_db.py +import sqlite3 + +import pytest + +from ntr_fetcher.db import Database + + +@pytest.fixture +def db(tmp_path): + db_path = str(tmp_path / "test.db") + database = Database(db_path) + database.initialize() + return database + + +def test_tables_created(db): + conn = sqlite3.connect(db.path) + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" + ) + tables = [row[0] for row in cursor.fetchall()] + conn.close() + assert "tracks" in tables + assert "shows" in tables + assert "show_tracks" in tables + + +def test_initialize_idempotent(db): + """Calling initialize twice doesn't raise.""" + db.initialize() +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_db.py::test_tables_created -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Write the schema initialization** + +```python +# src/ntr_fetcher/db.py +import sqlite3 +from datetime import datetime, timezone + +from ntr_fetcher.models import Track, Show, ShowTrack + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS tracks ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + artist TEXT NOT NULL, + permalink_url TEXT NOT NULL, + artwork_url TEXT, + duration_ms INTEGER NOT NULL, + license TEXT NOT NULL DEFAULT '', + liked_at TEXT NOT NULL, + raw_json TEXT NOT NULL DEFAULT '{}' +); + +CREATE TABLE IF NOT EXISTS shows ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + week_start TEXT NOT NULL, + week_end TEXT NOT NULL, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS show_tracks ( + show_id INTEGER NOT NULL REFERENCES shows(id), + track_id INTEGER NOT NULL REFERENCES tracks(id), + position INTEGER NOT NULL, + UNIQUE(show_id, track_id) +); + +CREATE INDEX IF NOT EXISTS idx_show_tracks_show_id ON show_tracks(show_id); +CREATE INDEX IF NOT EXISTS idx_tracks_liked_at ON tracks(liked_at); +""" + + +class Database: + def __init__(self, path: str): + self.path = path + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.path) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.row_factory = sqlite3.Row + return conn + + def initialize(self) -> None: + conn = self._connect() + conn.executescript(SCHEMA) + conn.close() +``` + +**Step 4: Run test to verify it passes** + +Run: `pytest tests/test_db.py -v` +Expected: 2 passed. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/db.py tests/test_db.py +git commit -m "feat: add database module with schema initialization" +``` + +--- + +## Task 6: Database Module — Queries + +**Files:** +- Modify: `src/ntr_fetcher/db.py` +- Modify: `tests/test_db.py` + +Add methods for all the CRUD operations the service needs. + +**Step 1: Write the failing tests** + +Append to `tests/test_db.py`: + +```python +from datetime import datetime, timezone + +from ntr_fetcher.models import Track + + +def _make_track(id: int, liked_at: str, title: str = "Test", artist: str = "Artist") -> Track: + return Track( + id=id, + title=title, + artist=artist, + permalink_url=f"https://soundcloud.com/test/track-{id}", + artwork_url=None, + duration_ms=180000, + license="cc-by", + liked_at=datetime.fromisoformat(liked_at), + raw_json="{}", + ) + + +def test_upsert_track(db): + track = _make_track(100, "2026-03-10T12:00:00+00:00") + db.upsert_track(track) + result = db.get_track(100) + assert result is not None + assert result.title == "Test" + + +def test_upsert_track_updates_existing(db): + track1 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Original") + db.upsert_track(track1) + track2 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Updated") + db.upsert_track(track2) + result = db.get_track(100) + assert result.title == "Updated" + + +def test_get_or_create_show(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + assert show.id is not None + assert show.week_start == week_start + + # Calling again returns the same show + show2 = db.get_or_create_show(week_start, week_end) + assert show2.id == show.id + + +def test_set_show_tracks(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + + t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First") + t2 = _make_track(2, "2026-03-14T02:00:00+00:00", title="Second") + db.upsert_track(t1) + db.upsert_track(t2) + + db.set_show_tracks(show.id, [t1.id, t2.id]) + tracks = db.get_show_tracks(show.id) + assert len(tracks) == 2 + assert tracks[0]["position"] == 1 + assert tracks[0]["title"] == "First" + assert tracks[1]["position"] == 2 + + +def test_set_show_tracks_preserves_existing_positions(db): + """Adding a new track doesn't change existing track positions.""" + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + + t1 = _make_track(1, "2026-03-14T01:00:00+00:00") + db.upsert_track(t1) + db.set_show_tracks(show.id, [t1.id]) + + t2 = _make_track(2, "2026-03-14T02:00:00+00:00") + db.upsert_track(t2) + db.set_show_tracks(show.id, [t1.id, t2.id]) + + tracks = db.get_show_tracks(show.id) + assert tracks[0]["track_id"] == 1 + assert tracks[0]["position"] == 1 + assert tracks[1]["track_id"] == 2 + assert tracks[1]["position"] == 2 + + +def test_get_show_track_by_position(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First") + db.upsert_track(t1) + db.set_show_tracks(show.id, [t1.id]) + + result = db.get_show_track_by_position(show.id, 1) + assert result is not None + assert result["title"] == "First" + + result_missing = db.get_show_track_by_position(show.id, 99) + assert result_missing is None + + +def test_list_shows(db): + s1 = db.get_or_create_show( + datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc), + datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc), + ) + s2 = db.get_or_create_show( + datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc), + datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc), + ) + shows = db.list_shows(limit=10, offset=0) + assert len(shows) == 2 + # Newest first + assert shows[0].id == s2.id + + +def test_max_position_for_show(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + + assert db.get_max_position(show.id) == 0 + + t1 = _make_track(1, "2026-03-14T01:00:00+00:00") + db.upsert_track(t1) + db.set_show_tracks(show.id, [t1.id]) + assert db.get_max_position(show.id) == 1 + + +def test_remove_show_track(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + + t1 = _make_track(1, "2026-03-14T01:00:00+00:00") + t2 = _make_track(2, "2026-03-14T02:00:00+00:00") + t3 = _make_track(3, "2026-03-14T03:00:00+00:00") + db.upsert_track(t1) + db.upsert_track(t2) + db.upsert_track(t3) + db.set_show_tracks(show.id, [t1.id, t2.id, t3.id]) + + db.remove_show_track(show.id, 2) + tracks = db.get_show_tracks(show.id) + assert len(tracks) == 2 + assert tracks[0]["position"] == 1 + assert tracks[0]["track_id"] == 1 + # Position re-compacted: track 3 moved from position 3 to 2 + assert tracks[1]["position"] == 2 + assert tracks[1]["track_id"] == 3 + + +def test_move_show_track(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + + t1 = _make_track(1, "2026-03-14T01:00:00+00:00") + t2 = _make_track(2, "2026-03-14T02:00:00+00:00") + t3 = _make_track(3, "2026-03-14T03:00:00+00:00") + db.upsert_track(t1) + db.upsert_track(t2) + db.upsert_track(t3) + db.set_show_tracks(show.id, [t1.id, t2.id, t3.id]) + + # Move track 3 from position 3 to position 1 + db.move_show_track(show.id, track_id=3, new_position=1) + tracks = db.get_show_tracks(show.id) + assert tracks[0]["track_id"] == 3 + assert tracks[0]["position"] == 1 + assert tracks[1]["track_id"] == 1 + assert tracks[1]["position"] == 2 + assert tracks[2]["track_id"] == 2 + assert tracks[2]["position"] == 3 + + +def test_add_track_to_show_at_position(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + + t1 = _make_track(1, "2026-03-14T01:00:00+00:00") + t2 = _make_track(2, "2026-03-14T02:00:00+00:00") + t3 = _make_track(3, "2026-03-14T03:00:00+00:00") + db.upsert_track(t1) + db.upsert_track(t2) + db.upsert_track(t3) + db.set_show_tracks(show.id, [t1.id, t2.id]) + + # Insert track 3 at position 2 — pushes track 2 to position 3 + db.add_track_to_show(show.id, track_id=3, position=2) + tracks = db.get_show_tracks(show.id) + assert len(tracks) == 3 + assert tracks[0]["track_id"] == 1 + assert tracks[1]["track_id"] == 3 + assert tracks[2]["track_id"] == 2 + + +def test_has_track_in_show(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + t1 = _make_track(1, "2026-03-14T01:00:00+00:00") + db.upsert_track(t1) + db.set_show_tracks(show.id, [t1.id]) + + assert db.has_track_in_show(show.id, 1) is True + assert db.has_track_in_show(show.id, 999) is False +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_db.py -v` +Expected: FAIL — `AttributeError: 'Database' object has no attribute 'upsert_track'` + +**Step 3: Write the query methods** + +Add these methods to the `Database` class in `src/ntr_fetcher/db.py`: + +```python + def upsert_track(self, track: Track) -> None: + conn = self._connect() + conn.execute( + """INSERT INTO tracks (id, title, artist, permalink_url, artwork_url, duration_ms, license, liked_at, raw_json) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + title=excluded.title, + artist=excluded.artist, + permalink_url=excluded.permalink_url, + artwork_url=excluded.artwork_url, + duration_ms=excluded.duration_ms, + license=excluded.license, + liked_at=excluded.liked_at, + raw_json=excluded.raw_json + """, + ( + track.id, + track.title, + track.artist, + track.permalink_url, + track.artwork_url, + track.duration_ms, + track.license, + track.liked_at.isoformat(), + track.raw_json, + ), + ) + conn.commit() + conn.close() + + def get_track(self, track_id: int) -> Track | None: + conn = self._connect() + row = conn.execute("SELECT * FROM tracks WHERE id = ?", (track_id,)).fetchone() + conn.close() + if row is None: + return None + return Track( + id=row["id"], + title=row["title"], + artist=row["artist"], + permalink_url=row["permalink_url"], + artwork_url=row["artwork_url"], + duration_ms=row["duration_ms"], + license=row["license"], + liked_at=datetime.fromisoformat(row["liked_at"]), + raw_json=row["raw_json"], + ) + + def get_or_create_show(self, week_start: datetime, week_end: datetime) -> Show: + conn = self._connect() + start_iso = week_start.isoformat() + end_iso = week_end.isoformat() + row = conn.execute( + "SELECT * FROM shows WHERE week_start = ? AND week_end = ?", + (start_iso, end_iso), + ).fetchone() + if row is not None: + conn.close() + return Show( + id=row["id"], + week_start=datetime.fromisoformat(row["week_start"]), + week_end=datetime.fromisoformat(row["week_end"]), + created_at=datetime.fromisoformat(row["created_at"]), + ) + now_iso = datetime.now(timezone.utc).isoformat() + cursor = conn.execute( + "INSERT INTO shows (week_start, week_end, created_at) VALUES (?, ?, ?)", + (start_iso, end_iso, now_iso), + ) + conn.commit() + show = Show( + id=cursor.lastrowid, + week_start=week_start, + week_end=week_end, + created_at=datetime.fromisoformat(now_iso), + ) + conn.close() + return show + + def get_show_tracks(self, show_id: int) -> list[dict]: + conn = self._connect() + rows = conn.execute( + """SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url, + t.artwork_url, t.duration_ms, t.liked_at + FROM show_tracks st + JOIN tracks t ON st.track_id = t.id + WHERE st.show_id = ? + ORDER BY st.position""", + (show_id,), + ).fetchall() + conn.close() + return [dict(row) for row in rows] + + def get_show_track_by_position(self, show_id: int, position: int) -> dict | None: + conn = self._connect() + row = conn.execute( + """SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url, + t.artwork_url, t.duration_ms, t.liked_at + FROM show_tracks st + JOIN tracks t ON st.track_id = t.id + WHERE st.show_id = ? AND st.position = ?""", + (show_id, position), + ).fetchone() + conn.close() + return dict(row) if row else None + + def set_show_tracks(self, show_id: int, track_ids: list[int]) -> None: + """Set show tracks, preserving positions of already-assigned tracks.""" + conn = self._connect() + existing = conn.execute( + "SELECT track_id, position FROM show_tracks WHERE show_id = ? ORDER BY position", + (show_id,), + ).fetchall() + existing_map = {row["track_id"]: row["position"] for row in existing} + + max_pos = max(existing_map.values()) if existing_map else 0 + for track_id in track_ids: + if track_id not in existing_map: + max_pos += 1 + conn.execute( + "INSERT OR IGNORE INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)", + (show_id, track_id, max_pos), + ) + conn.commit() + conn.close() + + def get_max_position(self, show_id: int) -> int: + conn = self._connect() + row = conn.execute( + "SELECT COALESCE(MAX(position), 0) as max_pos FROM show_tracks WHERE show_id = ?", + (show_id,), + ).fetchone() + conn.close() + return row["max_pos"] + + def list_shows(self, limit: int = 20, offset: int = 0) -> list[Show]: + conn = self._connect() + rows = conn.execute( + "SELECT * FROM shows ORDER BY week_start DESC LIMIT ? OFFSET ?", + (limit, offset), + ).fetchall() + conn.close() + return [ + Show( + id=row["id"], + week_start=datetime.fromisoformat(row["week_start"]), + week_end=datetime.fromisoformat(row["week_end"]), + created_at=datetime.fromisoformat(row["created_at"]), + ) + for row in rows + ] + + def has_track_in_show(self, show_id: int, track_id: int) -> bool: + conn = self._connect() + row = conn.execute( + "SELECT 1 FROM show_tracks WHERE show_id = ? AND track_id = ?", + (show_id, track_id), + ).fetchone() + conn.close() + return row is not None + + def remove_show_track(self, show_id: int, track_id: int) -> None: + conn = self._connect() + conn.execute( + "DELETE FROM show_tracks WHERE show_id = ? AND track_id = ?", + (show_id, track_id), + ) + # Re-compact positions + rows = conn.execute( + "SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position", + (show_id,), + ).fetchall() + for i, row in enumerate(rows, start=1): + conn.execute( + "UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?", + (i, show_id, row["track_id"]), + ) + conn.commit() + conn.close() + + def move_show_track(self, show_id: int, track_id: int, new_position: int) -> None: + conn = self._connect() + # Get current ordered list + rows = conn.execute( + "SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position", + (show_id,), + ).fetchall() + ordered = [row["track_id"] for row in rows] + + # Remove and reinsert at new position + ordered.remove(track_id) + ordered.insert(new_position - 1, track_id) + + for i, tid in enumerate(ordered, start=1): + conn.execute( + "UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?", + (i, show_id, tid), + ) + conn.commit() + conn.close() + + def add_track_to_show(self, show_id: int, track_id: int, position: int | None = None) -> None: + conn = self._connect() + if position is None: + max_pos = self.get_max_position(show_id) + conn.execute( + "INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)", + (show_id, track_id, max_pos + 1), + ) + else: + # Shift existing tracks at >= position up by 1 + conn.execute( + "UPDATE show_tracks SET position = position + 1 WHERE show_id = ? AND position >= ?", + (show_id, position), + ) + conn.execute( + "INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)", + (show_id, track_id, position), + ) + conn.commit() + conn.close() +``` + +Note: `from datetime import datetime, timezone` and `from ntr_fetcher.models import Track, Show, ShowTrack` are already imported at the top of `db.py`. + +**Step 4: Run tests to verify they pass** + +Run: `pytest tests/test_db.py -v` +Expected: All tests pass. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/db.py tests/test_db.py +git commit -m "feat: add database query methods for tracks, shows, and show_tracks" +``` + +--- + +## Task 7: SoundCloud Client — `client_id` Extraction + +**Files:** +- Create: `src/ntr_fetcher/soundcloud.py` +- Create: `tests/test_soundcloud.py` + +**Step 1: Write the failing test** + +```python +# tests/test_soundcloud.py +import pytest +import httpx + +from ntr_fetcher.soundcloud import SoundCloudClient + + +FAKE_HTML = """ + +""" + +FAKE_HTML_EXPIRING = """ + +""" + + +@pytest.mark.asyncio +async def test_extract_client_id(httpx_mock): + httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) + client = SoundCloudClient() + client_id = await client._extract_client_id() + assert client_id == "test_client_id_abc123" + + +@pytest.mark.asyncio +async def test_extract_client_id_caches(httpx_mock): + httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) + client = SoundCloudClient() + id1 = await client._extract_client_id() + id2 = await client._extract_client_id() + assert id1 == id2 + assert len(httpx_mock.get_requests()) == 1 # Only fetched once + + +@pytest.mark.asyncio +async def test_extract_client_id_bad_html(httpx_mock): + httpx_mock.add_response(url="https://soundcloud.com", text="no hydration here") + client = SoundCloudClient() + with pytest.raises(ValueError, match="client_id"): + await client._extract_client_id() +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_soundcloud.py -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Write the implementation** + +```python +# src/ntr_fetcher/soundcloud.py +import json +import logging +import re + +import httpx + +logger = logging.getLogger(__name__) + +SOUNDCLOUD_BASE = "https://soundcloud.com" +API_BASE = "https://api-v2.soundcloud.com" +HYDRATION_PATTERN = re.compile(r"__sc_hydration\s*=\s*(\[.*?\])\s*;", re.DOTALL) + + +class SoundCloudClient: + def __init__(self, http_client: httpx.AsyncClient | None = None): + self._http = http_client or httpx.AsyncClient(timeout=15.0) + self._client_id: str | None = None + + async def _extract_client_id(self) -> str: + if self._client_id is not None: + return self._client_id + + resp = await self._http.get(SOUNDCLOUD_BASE) + resp.raise_for_status() + match = HYDRATION_PATTERN.search(resp.text) + if not match: + raise ValueError("Could not find __sc_hydration in SoundCloud HTML — cannot extract client_id") + + hydration = json.loads(match.group(1)) + for entry in hydration: + if entry.get("hydratable") == "apiClient": + self._client_id = entry["data"]["id"] + is_expiring = entry["data"].get("isExpiring", False) + if is_expiring: + logger.warning("SoundCloud client_id is marked as expiring") + return self._client_id + + raise ValueError("No apiClient entry in __sc_hydration — cannot extract client_id") + + def invalidate_client_id(self) -> None: + self._client_id = None + + async def close(self) -> None: + await self._http.aclose() +``` + +**Step 4: Run test to verify it passes** + +Run: `pytest tests/test_soundcloud.py -v` +Expected: 3 passed. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py +git commit -m "feat: add SoundCloud client with client_id extraction" +``` + +--- + +## Task 8: SoundCloud Client — Resolve User & Fetch Likes + +**Files:** +- Modify: `src/ntr_fetcher/soundcloud.py` +- Modify: `tests/test_soundcloud.py` + +**Step 1: Write the failing tests** + +Append to `tests/test_soundcloud.py`: + +```python +import json as json_mod +from datetime import datetime, timezone + +from ntr_fetcher.models import Track + + +FAKE_RESOLVE_RESPONSE = {"id": 206979918, "kind": "user", "username": "NICKtheRAT"} + +FAKE_LIKES_RESPONSE = { + "collection": [ + { + "created_at": "2026-03-09T02:25:43Z", + "kind": "like", + "track": { + "id": 12345, + "title": "Test Track", + "permalink_url": "https://soundcloud.com/artist/test-track", + "duration": 180000, + "full_duration": 180000, + "genre": "Electronic", + "tag_list": "", + "created_at": "2026-03-01T00:00:00Z", + "description": "", + "artwork_url": "https://i1.sndcdn.com/artworks-abc-large.jpg", + "license": "cc-by", + "user": { + "id": 999, + "username": "TestArtist", + "permalink_url": "https://soundcloud.com/testartist", + }, + "media": {"transcodings": []}, + }, + } + ], + "next_href": None, +} + + +@pytest.mark.asyncio +async def test_resolve_user(httpx_mock): + httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) + httpx_mock.add_response( + url=re.compile(r"https://api-v2\.soundcloud\.com/resolve.*"), + json=FAKE_RESOLVE_RESPONSE, + ) + client = SoundCloudClient() + user_id = await client.resolve_user("nicktherat") + assert user_id == 206979918 + + +@pytest.mark.asyncio +async def test_fetch_likes(httpx_mock): + httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) + httpx_mock.add_response( + url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), + json=FAKE_LIKES_RESPONSE, + ) + client = SoundCloudClient() + tracks = await client.fetch_likes( + user_id=206979918, + since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc), + until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc), + ) + assert len(tracks) == 1 + assert tracks[0].title == "Test Track" + assert tracks[0].artist == "TestArtist" + assert tracks[0].id == 12345 + + +@pytest.mark.asyncio +async def test_fetch_likes_filters_outside_range(httpx_mock): + httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) + httpx_mock.add_response( + url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), + json=FAKE_LIKES_RESPONSE, + ) + client = SoundCloudClient() + # Range that excludes the track (liked at 2026-03-09) + tracks = await client.fetch_likes( + user_id=206979918, + since=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc), + until=datetime(2026, 3, 12, 0, 0, 0, tzinfo=timezone.utc), + ) + assert len(tracks) == 0 + + +@pytest.mark.asyncio +async def test_fetch_likes_retries_on_401(httpx_mock): + """On 401, client_id is refreshed and the request is retried.""" + httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) + # First likes call returns 401 + httpx_mock.add_response( + url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), + status_code=401, + ) + # Re-extraction gets new client_id + httpx_mock.add_response( + url="https://soundcloud.com", + text=FAKE_HTML.replace("test_client_id_abc123", "new_client_id_456"), + ) + # Retry succeeds + httpx_mock.add_response( + url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), + json=FAKE_LIKES_RESPONSE, + ) + client = SoundCloudClient() + tracks = await client.fetch_likes( + user_id=206979918, + since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc), + until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc), + ) + assert len(tracks) == 1 +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_soundcloud.py -v` +Expected: FAIL — `AttributeError: 'SoundCloudClient' object has no attribute 'resolve_user'` + +**Step 3: Write the implementation** + +Add to `SoundCloudClient` in `src/ntr_fetcher/soundcloud.py`: + +```python + async def _api_get(self, url: str, params: dict | None = None) -> httpx.Response: + """Make an API GET request with automatic client_id injection and 401 retry.""" + client_id = await self._extract_client_id() + params = dict(params or {}) + params["client_id"] = client_id + + for attempt in range(3): + resp = await self._http.get(url, params=params) + if resp.status_code == 401: + logger.warning("Got 401 from SoundCloud API, refreshing client_id (attempt %d)", attempt + 1) + self.invalidate_client_id() + client_id = await self._extract_client_id() + params["client_id"] = client_id + continue + resp.raise_for_status() + return resp + + raise httpx.HTTPStatusError( + "Failed after 3 attempts (401)", + request=resp.request, + response=resp, + ) + + async def resolve_user(self, username: str) -> int: + resp = await self._api_get( + f"{API_BASE}/resolve", + params={"url": f"{SOUNDCLOUD_BASE}/{username}"}, + ) + return resp.json()["id"] + + async def fetch_likes( + self, + user_id: int, + since: datetime, + until: datetime, + limit: int = 50, + ) -> list["Track"]: + from ntr_fetcher.models import Track + + cursor = _build_cursor(until, user_id) + collected: list[Track] = [] + + while True: + params = {"limit": limit} + if cursor: + params["offset"] = cursor + + resp = await self._api_get(f"{API_BASE}/users/{user_id}/likes", params=params) + data = resp.json() + collection = data.get("collection", []) + + if not collection: + break + + stop = False + for item in collection: + liked_at_str = item.get("created_at", "") + liked_at = datetime.fromisoformat(liked_at_str.replace("Z", "+00:00")) + + if liked_at < since: + stop = True + break + + if liked_at > until: + continue + + track_data = item.get("track") + if track_data is None: + continue + + user_data = track_data.get("user", {}) + collected.append( + Track( + id=track_data["id"], + title=track_data["title"], + artist=user_data.get("username", "Unknown"), + permalink_url=track_data["permalink_url"], + artwork_url=track_data.get("artwork_url"), + duration_ms=track_data.get("full_duration", track_data.get("duration", 0)), + license=track_data.get("license", ""), + liked_at=liked_at, + raw_json=json.dumps(track_data), + ) + ) + + if stop: + break + + next_href = data.get("next_href") + if not next_href: + break + + # Extract cursor from next_href + from urllib.parse import urlparse, parse_qs + parsed = urlparse(next_href) + qs = parse_qs(parsed.query) + cursor = qs.get("offset", [None])[0] + if cursor is None: + break + + # Return in chronological order (oldest first) + collected.sort(key=lambda t: t.liked_at) + return collected +``` + +Also add this module-level helper and the missing import at the top of the file: + +```python +from datetime import datetime +from urllib.parse import quote + + +def _build_cursor(until: datetime, user_id: int) -> str: + ts = until.strftime("%Y-%m-%dT%H:%M:%S.000Z") + padded_user = str(user_id).zfill(22) + raw = f"{ts},user-track-likes,000-{padded_user}-99999999999999999999" + return raw +``` + +**Step 4: Run test to verify it passes** + +Run: `pytest tests/test_soundcloud.py -v` +Expected: All tests pass. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py +git commit -m "feat: add user resolution and likes fetching with 401 retry" +``` + +--- + +## Task 9: Poller & Supervisor + +**Files:** +- Create: `src/ntr_fetcher/poller.py` +- Create: `tests/test_poller.py` + +**Step 1: Write the failing test** + +```python +# tests/test_poller.py +import asyncio +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from ntr_fetcher.poller import Poller +from ntr_fetcher.models import Track + + +def _make_track(id: int, liked_at: str) -> Track: + return Track( + id=id, + title=f"Track {id}", + artist="Artist", + permalink_url=f"https://soundcloud.com/a/t-{id}", + artwork_url=None, + duration_ms=180000, + license="cc-by", + liked_at=datetime.fromisoformat(liked_at), + raw_json="{}", + ) + + +@pytest.mark.asyncio +async def test_poll_once_fetches_and_stores(): + mock_sc = AsyncMock() + mock_sc.resolve_user.return_value = 206979918 + mock_sc.fetch_likes.return_value = [ + _make_track(1, "2026-03-14T01:00:00+00:00"), + _make_track(2, "2026-03-14T02:00:00+00:00"), + ] + + mock_db = MagicMock() + mock_show = MagicMock() + mock_show.id = 1 + mock_db.get_or_create_show.return_value = mock_show + + poller = Poller( + db=mock_db, + soundcloud=mock_sc, + soundcloud_user="nicktherat", + show_day=2, + show_hour=22, + poll_interval=3600, + ) + + await poller.poll_once() + + assert mock_sc.resolve_user.called + assert mock_sc.fetch_likes.called + assert mock_db.upsert_track.call_count == 2 + assert mock_db.set_show_tracks.called + call_args = mock_db.set_show_tracks.call_args + assert call_args[0][0] == 1 # show_id + assert call_args[0][1] == [1, 2] # track_ids in order + + +@pytest.mark.asyncio +async def test_poll_once_full_refresh(): + mock_sc = AsyncMock() + mock_sc.resolve_user.return_value = 206979918 + mock_sc.fetch_likes.return_value = [ + _make_track(1, "2026-03-14T01:00:00+00:00"), + ] + + mock_db = MagicMock() + mock_show = MagicMock() + mock_show.id = 1 + mock_db.get_or_create_show.return_value = mock_show + + poller = Poller( + db=mock_db, + soundcloud=mock_sc, + soundcloud_user="nicktherat", + show_day=2, + show_hour=22, + poll_interval=3600, + ) + + await poller.poll_once(full=True) + + # full=True should still call fetch_likes + assert mock_sc.fetch_likes.called + + +@pytest.mark.asyncio +async def test_supervisor_restarts_poller_on_failure(): + call_count = 0 + + async def failing_poll(): + nonlocal call_count + call_count += 1 + if call_count <= 2: + raise RuntimeError("Simulated failure") + + mock_sc = AsyncMock() + mock_db = MagicMock() + + poller = Poller( + db=mock_db, + soundcloud=mock_sc, + soundcloud_user="nicktherat", + show_day=2, + show_hour=22, + poll_interval=0.01, # Very short for testing + ) + poller.poll_once = failing_poll + + task = asyncio.create_task(poller.run_supervised(restart_delay=0.01)) + await asyncio.sleep(0.2) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert call_count >= 3 # Failed twice, succeeded at least once +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_poller.py -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Write the implementation** + +```python +# src/ntr_fetcher/poller.py +import asyncio +import logging +from datetime import datetime, timezone + +from ntr_fetcher.db import Database +from ntr_fetcher.soundcloud import SoundCloudClient +from ntr_fetcher.week import get_show_week + +logger = logging.getLogger(__name__) + + +class Poller: + def __init__( + self, + db: Database, + soundcloud: SoundCloudClient, + soundcloud_user: str, + show_day: int, + show_hour: int, + poll_interval: float, + ): + self._db = db + self._sc = soundcloud + self._user = soundcloud_user + self._show_day = show_day + self._show_hour = show_hour + self._poll_interval = poll_interval + self._user_id: int | None = None + self.last_fetch: datetime | None = None + self.alive = True + + async def _get_user_id(self) -> int: + if self._user_id is None: + self._user_id = await self._sc.resolve_user(self._user) + return self._user_id + + async def poll_once(self, full: bool = False) -> None: + user_id = await self._get_user_id() + now = datetime.now(timezone.utc) + week_start, week_end = get_show_week(now, self._show_day, self._show_hour) + show = self._db.get_or_create_show(week_start, week_end) + + tracks = await self._sc.fetch_likes( + user_id=user_id, + since=week_start, + until=week_end, + ) + + for track in tracks: + self._db.upsert_track(track) + + track_ids = [t.id for t in tracks] + self._db.set_show_tracks(show.id, track_ids) + self.last_fetch = datetime.now(timezone.utc) + logger.info("Fetched %d tracks for show %d", len(tracks), show.id) + + async def run_supervised(self, restart_delay: float = 30.0) -> None: + while True: + try: + self.alive = True + await self.poll_once() + await asyncio.sleep(self._poll_interval) + except asyncio.CancelledError: + self.alive = False + raise + except Exception: + self.alive = False + logger.exception("Poller failed, restarting in %.1fs", restart_delay) + await asyncio.sleep(restart_delay) +``` + +**Step 4: Run test to verify it passes** + +Run: `pytest tests/test_poller.py -v` +Expected: 3 passed. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/poller.py tests/test_poller.py +git commit -m "feat: add poller with supervised restart loop" +``` + +--- + +## Task 10: API Routes + +**Files:** +- Create: `src/ntr_fetcher/api.py` +- Create: `tests/test_api.py` + +**Step 1: Write the failing tests** + +```python +# tests/test_api.py +from datetime import datetime, timezone + +import pytest +from fastapi.testclient import TestClient + +from ntr_fetcher.api import create_app +from ntr_fetcher.db import Database +from ntr_fetcher.models import Track + + +@pytest.fixture +def db(tmp_path): + database = Database(str(tmp_path / "test.db")) + database.initialize() + return database + + +@pytest.fixture +def app(db): + from unittest.mock import AsyncMock, MagicMock + poller = MagicMock() + poller.last_fetch = datetime(2026, 3, 12, 12, 0, 0, tzinfo=timezone.utc) + poller.alive = True + poller.poll_once = AsyncMock() + return create_app(db=db, poller=poller, admin_token="test-token") + + +@pytest.fixture +def client(app): + return TestClient(app) + + +def _seed_show(db): + week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + t1 = Track(1, "Song A", "Artist A", "https://soundcloud.com/a/1", None, 180000, "cc-by", + datetime(2026, 3, 14, 1, 0, 0, tzinfo=timezone.utc), "{}") + t2 = Track(2, "Song B", "Artist B", "https://soundcloud.com/b/2", None, 200000, "cc-by-sa", + datetime(2026, 3, 14, 2, 0, 0, tzinfo=timezone.utc), "{}") + db.upsert_track(t1) + db.upsert_track(t2) + db.set_show_tracks(show.id, [t1.id, t2.id]) + return show + + +def test_health(client): + resp = client.get("/health") + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["poller_alive"] is True + + +def test_playlist(client, db): + _seed_show(db) + resp = client.get("/playlist") + assert resp.status_code == 200 + data = resp.json() + assert len(data["tracks"]) == 2 + assert data["tracks"][0]["position"] == 1 + assert data["tracks"][0]["title"] == "Song A" + + +def test_playlist_by_position(client, db): + _seed_show(db) + resp = client.get("/playlist/2") + assert resp.status_code == 200 + assert resp.json()["title"] == "Song B" + + +def test_playlist_by_position_not_found(client, db): + _seed_show(db) + resp = client.get("/playlist/99") + assert resp.status_code == 404 + + +def test_shows_list(client, db): + _seed_show(db) + resp = client.get("/shows") + assert resp.status_code == 200 + assert len(resp.json()) >= 1 + + +def test_shows_detail(client, db): + show = _seed_show(db) + resp = client.get(f"/shows/{show.id}") + assert resp.status_code == 200 + assert len(resp.json()["tracks"]) == 2 + + +def test_admin_refresh_requires_token(client): + resp = client.post("/admin/refresh") + assert resp.status_code == 401 + + +def test_admin_refresh_with_token(client): + resp = client.post( + "/admin/refresh", + headers={"Authorization": "Bearer test-token"}, + json={"full": False}, + ) + assert resp.status_code == 200 + + +def test_admin_remove_track(client, db): + show = _seed_show(db) + resp = client.delete( + "/admin/tracks/1", + headers={"Authorization": "Bearer test-token"}, + ) + assert resp.status_code == 200 + tracks = db.get_show_tracks(show.id) + assert len(tracks) == 1 +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest tests/test_api.py -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Write the implementation** + +```python +# src/ntr_fetcher/api.py +import logging +from datetime import datetime, timezone + +from fastapi import FastAPI, HTTPException, Depends, Header +from pydantic import BaseModel + +from ntr_fetcher.db import Database +from ntr_fetcher.week import get_show_week + +logger = logging.getLogger(__name__) + + +class RefreshRequest(BaseModel): + full: bool = False + + +class AddTrackRequest(BaseModel): + soundcloud_url: str | None = None + track_id: int | None = None + position: int | None = None + + +class MoveTrackRequest(BaseModel): + position: int + + +def create_app(db: Database, poller, admin_token: str) -> FastAPI: + app = FastAPI(title="NtR SoundCloud Fetcher") + + def _require_admin(authorization: str = Header(None)): + if authorization is None or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Missing or invalid token") + token = authorization.removeprefix("Bearer ") + if token != admin_token: + raise HTTPException(status_code=401, detail="Invalid token") + + def _current_show(): + now = datetime.now(timezone.utc) + week_start, week_end = get_show_week(now, show_day=2, show_hour=22) + return db.get_or_create_show(week_start, week_end) + + @app.get("/health") + def health(): + show = _current_show() + tracks = db.get_show_tracks(show.id) + return { + "status": "ok", + "poller_alive": poller.alive, + "last_fetch": poller.last_fetch.isoformat() if poller.last_fetch else None, + "current_week_track_count": len(tracks), + } + + @app.get("/playlist") + def playlist(): + show = _current_show() + tracks = db.get_show_tracks(show.id) + return { + "show_id": show.id, + "week_start": show.week_start.isoformat(), + "week_end": show.week_end.isoformat(), + "tracks": tracks, + } + + @app.get("/playlist/{position}") + def playlist_track(position: int): + show = _current_show() + track = db.get_show_track_by_position(show.id, position) + if track is None: + raise HTTPException(status_code=404, detail=f"No track at position {position}") + return track + + @app.get("/shows") + def list_shows(limit: int = 20, offset: int = 0): + shows = db.list_shows(limit=limit, offset=offset) + return [ + { + "id": s.id, + "week_start": s.week_start.isoformat(), + "week_end": s.week_end.isoformat(), + "created_at": s.created_at.isoformat(), + } + for s in shows + ] + + @app.get("/shows/{show_id}") + def show_detail(show_id: int): + shows = db.list_shows(limit=1000, offset=0) + show = next((s for s in shows if s.id == show_id), None) + if show is None: + raise HTTPException(status_code=404, detail="Show not found") + tracks = db.get_show_tracks(show.id) + return { + "show_id": show.id, + "week_start": show.week_start.isoformat(), + "week_end": show.week_end.isoformat(), + "tracks": tracks, + } + + @app.post("/admin/refresh") + async def admin_refresh(body: RefreshRequest = RefreshRequest(), _=Depends(_require_admin)): + await poller.poll_once(full=body.full) + show = _current_show() + tracks = db.get_show_tracks(show.id) + return {"status": "refreshed", "track_count": len(tracks)} + + @app.post("/admin/tracks") + def admin_add_track(body: AddTrackRequest, _=Depends(_require_admin)): + show = _current_show() + if body.track_id is not None: + db.add_track_to_show(show.id, body.track_id, body.position) + return {"status": "added"} + raise HTTPException(status_code=400, detail="Provide track_id") + + @app.delete("/admin/tracks/{track_id}") + def admin_remove_track(track_id: int, _=Depends(_require_admin)): + show = _current_show() + if not db.has_track_in_show(show.id, track_id): + raise HTTPException(status_code=404, detail="Track not in current show") + db.remove_show_track(show.id, track_id) + return {"status": "removed"} + + @app.put("/admin/tracks/{track_id}/position") + def admin_move_track(track_id: int, body: MoveTrackRequest, _=Depends(_require_admin)): + show = _current_show() + if not db.has_track_in_show(show.id, track_id): + raise HTTPException(status_code=404, detail="Track not in current show") + db.move_show_track(show.id, track_id, body.position) + return {"status": "moved"} + + return app +``` + +**Step 4: Run test to verify it passes** + +Run: `pytest tests/test_api.py -v` +Expected: All tests pass. + +**Step 5: Commit** + +```bash +git add src/ntr_fetcher/api.py tests/test_api.py +git commit -m "feat: add FastAPI routes for playlist, shows, admin, and health" +``` + +--- + +## Task 11: Main Entry Point + +**Files:** +- Create: `src/ntr_fetcher/main.py` + +**Step 1: Write the entry point** + +```python +# src/ntr_fetcher/main.py +import asyncio +import logging + +import uvicorn + +from ntr_fetcher.api import create_app +from ntr_fetcher.config import Settings +from ntr_fetcher.db import Database +from ntr_fetcher.poller import Poller +from ntr_fetcher.soundcloud import SoundCloudClient + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +def run() -> None: + settings = Settings() + + db = Database(settings.db_path) + db.initialize() + logger.info("Database initialized at %s", settings.db_path) + + sc = SoundCloudClient() + poller = Poller( + db=db, + soundcloud=sc, + soundcloud_user=settings.soundcloud_user, + show_day=settings.show_day, + show_hour=settings.show_hour, + poll_interval=settings.poll_interval_seconds, + ) + + app = create_app(db=db, poller=poller, admin_token=settings.admin_token) + + @app.on_event("startup") + async def start_poller(): + logger.info("Starting poller (interval=%ds)", settings.poll_interval_seconds) + asyncio.create_task(poller.run_supervised()) + + @app.on_event("shutdown") + async def shutdown(): + logger.info("Shutting down") + await sc.close() + + uvicorn.run(app, host=settings.host, port=settings.port) + + +if __name__ == "__main__": + run() +``` + +**Step 2: Verify it at least imports** + +Run: `python -c "from ntr_fetcher.main import run; print('OK')"` +Expected: `OK` + +**Step 3: Commit** + +```bash +git add src/ntr_fetcher/main.py +git commit -m "feat: add main entry point wiring API server and poller" +``` + +--- + +## Task 12: README + +**Files:** +- Create: `README.md` + +**Step 1: Write the README** + +```markdown +# NtR SoundCloud Fetcher + +Fetches SoundCloud likes from NicktheRat's profile, builds weekly playlists +aligned to the Wednesday 22:00 ET show schedule, and serves them via a JSON +API. + +## Quick Start + +```bash +pip install -e ".[dev]" +export NTR_ADMIN_TOKEN="your-secret-here" +ntr-fetcher +``` + +The API starts at `http://127.0.0.1:8000`. + +## API + +| Endpoint | Description | +|----------|-------------| +| `GET /playlist` | Current week's playlist | +| `GET /playlist/{n}` | Track at position n | +| `GET /shows` | List all shows | +| `GET /shows/{id}` | Specific show's playlist | +| `GET /health` | Service health check | +| `POST /admin/refresh` | Trigger SoundCloud fetch (token required) | + +## Configuration + +Environment variables (prefix `NTR_`): + +| Variable | Default | Description | +|----------|---------|-------------| +| `NTR_PORT` | `8000` | API port | +| `NTR_HOST` | `127.0.0.1` | Bind address | +| `NTR_DB_PATH` | `./ntr_fetcher.db` | SQLite path | +| `NTR_POLL_INTERVAL_SECONDS` | `3600` | Poll frequency | +| `NTR_ADMIN_TOKEN` | (required) | Admin bearer token | +| `NTR_SOUNDCLOUD_USER` | `nicktherat` | SoundCloud user | + +## Development + +```bash +pip install -e ".[dev]" +pytest +``` +``` + +**Step 2: Commit** + +```bash +git add README.md +git commit -m "docs: add README with quick start and API reference" +``` + +--- + +## Task 13: Run Full Test Suite + +**Step 1: Run all tests** + +Run: `pytest -v` +Expected: All tests pass. + +**Step 2: Run ruff linter** + +Run: `ruff check src/ tests/` +Expected: No errors (or fix any that appear). + +**Step 3: Final commit if any fixes were needed** + +```bash +git add -A +git commit -m "fix: address linting issues" +```