# NtR SoundCloud Fetcher Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Build a Python service that polls NicktheRat's SoundCloud likes, builds weekly playlists, and serves them via a JSON API. **Architecture:** Single async Python process — FastAPI serves the JSON API, an async background task polls SoundCloud hourly, and a supervisor loop restarts the poller on failure. SQLite stores all state. See `docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md` for the full design. **Tech Stack:** Python 3.11+, FastAPI, uvicorn, httpx, pydantic, sqlite3, zoneinfo. **Reference docs:** - Design: `docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md` - SoundCloud API: `docs/soundcloud-likes-api.md` --- ## Task 1: Project Scaffolding **Files:** - Create: `pyproject.toml` - Create: `src/ntr_fetcher/__init__.py` - Create: `tests/__init__.py` - Create: `tests/conftest.py` **Step 1: Create `pyproject.toml`** ```toml [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [project] name = "ntr-fetcher" version = "0.1.0" description = "SoundCloud likes fetcher for Nick the Rat Radio" requires-python = ">=3.11" dependencies = [ "fastapi", "uvicorn[standard]", "httpx", "pydantic-settings", ] [project.optional-dependencies] dev = [ "pytest", "pytest-asyncio", "pytest-httpx", "ruff", ] [project.scripts] ntr-fetcher = "ntr_fetcher.main:run" [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] [tool.ruff] target-version = "py311" src = ["src"] ``` **Step 2: Create package init** ```python # src/ntr_fetcher/__init__.py ``` (Empty file — just marks the package.) **Step 3: Create test scaffolding** ```python # tests/__init__.py ``` ```python # tests/conftest.py import pytest ``` **Step 4: Install the project in dev mode** Run: `pip install -e ".[dev]"` Expected: Successfully installed with all dependencies. **Step 5: Verify pytest runs** Run: `pytest --co` Expected: "no tests ran" (collected 0 items), exit code 0 or 5 (no tests found). **Step 6: Commit** ```bash git add pyproject.toml src/ tests/ git commit -m "scaffold: project structure with pyproject.toml and test config" ``` --- ## Task 2: Configuration Module **Files:** - Create: `src/ntr_fetcher/config.py` - Create: `tests/test_config.py` **Step 1: Write the failing test** ```python # tests/test_config.py import os from ntr_fetcher.config import Settings def test_settings_defaults(): settings = Settings(admin_token="test-secret") assert settings.port == 8000 assert settings.host == "127.0.0.1" assert settings.db_path == "./ntr_fetcher.db" assert settings.poll_interval_seconds == 3600 assert settings.soundcloud_user == "nicktherat" assert settings.show_day == 2 assert settings.show_hour == 22 def test_settings_from_env(monkeypatch): monkeypatch.setenv("NTR_PORT", "9090") monkeypatch.setenv("NTR_HOST", "0.0.0.0") monkeypatch.setenv("NTR_ADMIN_TOKEN", "my-secret") monkeypatch.setenv("NTR_SOUNDCLOUD_USER", "someoneelse") settings = Settings() assert settings.port == 9090 assert settings.host == "0.0.0.0" assert settings.admin_token == "my-secret" assert settings.soundcloud_user == "someoneelse" def test_settings_admin_token_required(): import pytest with pytest.raises(Exception): Settings() ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_config.py -v` Expected: FAIL — `ModuleNotFoundError: No module named 'ntr_fetcher.config'` **Step 3: Write the implementation** ```python # src/ntr_fetcher/config.py from pydantic_settings import BaseSettings class Settings(BaseSettings): model_config = {"env_prefix": "NTR_"} port: int = 8000 host: str = "127.0.0.1" db_path: str = "./ntr_fetcher.db" poll_interval_seconds: int = 3600 admin_token: str soundcloud_user: str = "nicktherat" show_day: int = 2 show_hour: int = 22 ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_config.py -v` Expected: 3 passed. **Step 5: Commit** ```bash git add src/ntr_fetcher/config.py tests/test_config.py git commit -m "feat: add configuration module with pydantic-settings" ``` --- ## Task 3: Week Boundary Computation **Files:** - Create: `src/ntr_fetcher/week.py` - Create: `tests/test_week.py` This module computes the Wednesday 22:00 Eastern boundaries in UTC. Must handle EST/EDT transitions correctly. **Step 1: Write the failing tests** ```python # tests/test_week.py from datetime import datetime, timezone from ntr_fetcher.week import get_show_week, Show_DAY_DEFAULT, SHOW_HOUR_DEFAULT def test_mid_week_thursday(): """Thursday should belong to the show that started the previous Wednesday.""" # Thursday March 13, 2026 15:00 UTC (11:00 AM EDT) now = datetime(2026, 3, 13, 15, 0, 0, tzinfo=timezone.utc) start, end = get_show_week(now, show_day=2, show_hour=22) # Show started Wed March 12 22:00 EDT = March 13 02:00 UTC (DST active since March 8) assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) # Show ends next Wed March 19 22:00 EDT = March 20 02:00 UTC assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) def test_wednesday_before_show(): """Wednesday before 22:00 ET belongs to the previous week's show.""" # Wednesday March 12, 2026 20:00 UTC (16:00 EDT — before 22:00 EDT) now = datetime(2026, 3, 12, 20, 0, 0, tzinfo=timezone.utc) start, end = get_show_week(now, show_day=2, show_hour=22) # Previous Wed was March 5 22:00 EST = March 6 03:00 UTC (before DST) assert start == datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc) # Ends this Wed March 12 22:00 EDT = March 13 02:00 UTC (after DST) assert end == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) def test_wednesday_after_show_starts(): """Wednesday at or after 22:00 ET belongs to the new week.""" # Wednesday March 12, 2026 at 22:30 EDT = March 13 02:30 UTC now = datetime(2026, 3, 13, 2, 30, 0, tzinfo=timezone.utc) start, end = get_show_week(now, show_day=2, show_hour=22) assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) def test_est_period_no_dst(): """January — firmly in EST (UTC-5).""" # Thursday Jan 15, 2026 12:00 UTC now = datetime(2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc) start, end = get_show_week(now, show_day=2, show_hour=22) # Wed Jan 14 22:00 EST = Jan 15 03:00 UTC assert start == datetime(2026, 1, 15, 3, 0, 0, tzinfo=timezone.utc) # Wed Jan 21 22:00 EST = Jan 22 03:00 UTC assert end == datetime(2026, 1, 22, 3, 0, 0, tzinfo=timezone.utc) ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_week.py -v` Expected: FAIL — `ModuleNotFoundError` **Step 3: Write the implementation** ```python # src/ntr_fetcher/week.py from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo EASTERN = ZoneInfo("America/New_York") Show_DAY_DEFAULT = 2 # Wednesday (Monday=0) SHOW_HOUR_DEFAULT = 22 def get_show_week( now_utc: datetime, show_day: int = Show_DAY_DEFAULT, show_hour: int = SHOW_HOUR_DEFAULT, ) -> tuple[datetime, datetime]: """Return (week_start_utc, week_end_utc) for the show week containing now_utc. The week starts at show_day at show_hour Eastern Time and runs for 7 days. """ now_et = now_utc.astimezone(EASTERN) # Find the most recent show_day at show_hour that is <= now days_since_show_day = (now_et.weekday() - show_day) % 7 candidate_date = now_et.date() - timedelta(days=days_since_show_day) candidate = datetime( candidate_date.year, candidate_date.month, candidate_date.day, show_hour, 0, 0, tzinfo=EASTERN, ) if candidate > now_et: # We're on the show day but before the show hour — go back a week candidate -= timedelta(days=7) week_start_utc = candidate.astimezone(timezone.utc).replace(tzinfo=timezone.utc) week_end_utc = (candidate + timedelta(days=7)).astimezone(timezone.utc).replace(tzinfo=timezone.utc) return week_start_utc, week_end_utc ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_week.py -v` Expected: 4 passed. **Step 5: Commit** ```bash git add src/ntr_fetcher/week.py tests/test_week.py git commit -m "feat: add week boundary computation with DST handling" ``` --- ## Task 4: Models **Files:** - Create: `src/ntr_fetcher/models.py` Simple dataclasses for passing data between layers. No test needed — these are pure data containers validated by type checkers and used transitively in other tests. **Step 1: Write the models** ```python # src/ntr_fetcher/models.py from dataclasses import dataclass from datetime import datetime @dataclass(frozen=True) class Track: id: int title: str artist: str permalink_url: str artwork_url: str | None duration_ms: int license: str liked_at: datetime raw_json: str @dataclass(frozen=True) class Show: id: int week_start: datetime week_end: datetime created_at: datetime @dataclass(frozen=True) class ShowTrack: show_id: int track_id: int position: int ``` **Step 2: Commit** ```bash git add src/ntr_fetcher/models.py git commit -m "feat: add data models for Track, Show, ShowTrack" ``` --- ## Task 5: Database Module — Schema & Migrations **Files:** - Create: `src/ntr_fetcher/db.py` - Create: `tests/test_db.py` This is a larger module. We'll build it in stages: schema first, then queries. **Step 1: Write the failing test for schema creation** ```python # tests/test_db.py import sqlite3 import pytest from ntr_fetcher.db import Database @pytest.fixture def db(tmp_path): db_path = str(tmp_path / "test.db") database = Database(db_path) database.initialize() return database def test_tables_created(db): conn = sqlite3.connect(db.path) cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" ) tables = [row[0] for row in cursor.fetchall()] conn.close() assert "tracks" in tables assert "shows" in tables assert "show_tracks" in tables def test_initialize_idempotent(db): """Calling initialize twice doesn't raise.""" db.initialize() ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_db.py::test_tables_created -v` Expected: FAIL — `ModuleNotFoundError` **Step 3: Write the schema initialization** ```python # src/ntr_fetcher/db.py import sqlite3 from datetime import datetime, timezone from ntr_fetcher.models import Track, Show, ShowTrack SCHEMA = """ CREATE TABLE IF NOT EXISTS tracks ( id INTEGER PRIMARY KEY, title TEXT NOT NULL, artist TEXT NOT NULL, permalink_url TEXT NOT NULL, artwork_url TEXT, duration_ms INTEGER NOT NULL, license TEXT NOT NULL DEFAULT '', liked_at TEXT NOT NULL, raw_json TEXT NOT NULL DEFAULT '{}' ); CREATE TABLE IF NOT EXISTS shows ( id INTEGER PRIMARY KEY AUTOINCREMENT, week_start TEXT NOT NULL, week_end TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS show_tracks ( show_id INTEGER NOT NULL REFERENCES shows(id), track_id INTEGER NOT NULL REFERENCES tracks(id), position INTEGER NOT NULL, UNIQUE(show_id, track_id) ); CREATE INDEX IF NOT EXISTS idx_show_tracks_show_id ON show_tracks(show_id); CREATE INDEX IF NOT EXISTS idx_tracks_liked_at ON tracks(liked_at); """ class Database: def __init__(self, path: str): self.path = path def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.path) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row return conn def initialize(self) -> None: conn = self._connect() conn.executescript(SCHEMA) conn.close() ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_db.py -v` Expected: 2 passed. **Step 5: Commit** ```bash git add src/ntr_fetcher/db.py tests/test_db.py git commit -m "feat: add database module with schema initialization" ``` --- ## Task 6: Database Module — Queries **Files:** - Modify: `src/ntr_fetcher/db.py` - Modify: `tests/test_db.py` Add methods for all the CRUD operations the service needs. **Step 1: Write the failing tests** Append to `tests/test_db.py`: ```python from datetime import datetime, timezone from ntr_fetcher.models import Track def _make_track(id: int, liked_at: str, title: str = "Test", artist: str = "Artist") -> Track: return Track( id=id, title=title, artist=artist, permalink_url=f"https://soundcloud.com/test/track-{id}", artwork_url=None, duration_ms=180000, license="cc-by", liked_at=datetime.fromisoformat(liked_at), raw_json="{}", ) def test_upsert_track(db): track = _make_track(100, "2026-03-10T12:00:00+00:00") db.upsert_track(track) result = db.get_track(100) assert result is not None assert result.title == "Test" def test_upsert_track_updates_existing(db): track1 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Original") db.upsert_track(track1) track2 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Updated") db.upsert_track(track2) result = db.get_track(100) assert result.title == "Updated" def test_get_or_create_show(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) assert show.id is not None assert show.week_start == week_start # Calling again returns the same show show2 = db.get_or_create_show(week_start, week_end) assert show2.id == show.id def test_set_show_tracks(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First") t2 = _make_track(2, "2026-03-14T02:00:00+00:00", title="Second") db.upsert_track(t1) db.upsert_track(t2) db.set_show_tracks(show.id, [t1.id, t2.id]) tracks = db.get_show_tracks(show.id) assert len(tracks) == 2 assert tracks[0]["position"] == 1 assert tracks[0]["title"] == "First" assert tracks[1]["position"] == 2 def test_set_show_tracks_preserves_existing_positions(db): """Adding a new track doesn't change existing track positions.""" week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = _make_track(1, "2026-03-14T01:00:00+00:00") db.upsert_track(t1) db.set_show_tracks(show.id, [t1.id]) t2 = _make_track(2, "2026-03-14T02:00:00+00:00") db.upsert_track(t2) db.set_show_tracks(show.id, [t1.id, t2.id]) tracks = db.get_show_tracks(show.id) assert tracks[0]["track_id"] == 1 assert tracks[0]["position"] == 1 assert tracks[1]["track_id"] == 2 assert tracks[1]["position"] == 2 def test_get_show_track_by_position(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First") db.upsert_track(t1) db.set_show_tracks(show.id, [t1.id]) result = db.get_show_track_by_position(show.id, 1) assert result is not None assert result["title"] == "First" result_missing = db.get_show_track_by_position(show.id, 99) assert result_missing is None def test_list_shows(db): s1 = db.get_or_create_show( datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc), datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc), ) s2 = db.get_or_create_show( datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc), datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc), ) shows = db.list_shows(limit=10, offset=0) assert len(shows) == 2 # Newest first assert shows[0].id == s2.id def test_max_position_for_show(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) assert db.get_max_position(show.id) == 0 t1 = _make_track(1, "2026-03-14T01:00:00+00:00") db.upsert_track(t1) db.set_show_tracks(show.id, [t1.id]) assert db.get_max_position(show.id) == 1 def test_remove_show_track(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = _make_track(1, "2026-03-14T01:00:00+00:00") t2 = _make_track(2, "2026-03-14T02:00:00+00:00") t3 = _make_track(3, "2026-03-14T03:00:00+00:00") db.upsert_track(t1) db.upsert_track(t2) db.upsert_track(t3) db.set_show_tracks(show.id, [t1.id, t2.id, t3.id]) db.remove_show_track(show.id, 2) tracks = db.get_show_tracks(show.id) assert len(tracks) == 2 assert tracks[0]["position"] == 1 assert tracks[0]["track_id"] == 1 # Position re-compacted: track 3 moved from position 3 to 2 assert tracks[1]["position"] == 2 assert tracks[1]["track_id"] == 3 def test_move_show_track(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = _make_track(1, "2026-03-14T01:00:00+00:00") t2 = _make_track(2, "2026-03-14T02:00:00+00:00") t3 = _make_track(3, "2026-03-14T03:00:00+00:00") db.upsert_track(t1) db.upsert_track(t2) db.upsert_track(t3) db.set_show_tracks(show.id, [t1.id, t2.id, t3.id]) # Move track 3 from position 3 to position 1 db.move_show_track(show.id, track_id=3, new_position=1) tracks = db.get_show_tracks(show.id) assert tracks[0]["track_id"] == 3 assert tracks[0]["position"] == 1 assert tracks[1]["track_id"] == 1 assert tracks[1]["position"] == 2 assert tracks[2]["track_id"] == 2 assert tracks[2]["position"] == 3 def test_add_track_to_show_at_position(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = _make_track(1, "2026-03-14T01:00:00+00:00") t2 = _make_track(2, "2026-03-14T02:00:00+00:00") t3 = _make_track(3, "2026-03-14T03:00:00+00:00") db.upsert_track(t1) db.upsert_track(t2) db.upsert_track(t3) db.set_show_tracks(show.id, [t1.id, t2.id]) # Insert track 3 at position 2 — pushes track 2 to position 3 db.add_track_to_show(show.id, track_id=3, position=2) tracks = db.get_show_tracks(show.id) assert len(tracks) == 3 assert tracks[0]["track_id"] == 1 assert tracks[1]["track_id"] == 3 assert tracks[2]["track_id"] == 2 def test_has_track_in_show(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = _make_track(1, "2026-03-14T01:00:00+00:00") db.upsert_track(t1) db.set_show_tracks(show.id, [t1.id]) assert db.has_track_in_show(show.id, 1) is True assert db.has_track_in_show(show.id, 999) is False ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_db.py -v` Expected: FAIL — `AttributeError: 'Database' object has no attribute 'upsert_track'` **Step 3: Write the query methods** Add these methods to the `Database` class in `src/ntr_fetcher/db.py`: ```python def upsert_track(self, track: Track) -> None: conn = self._connect() conn.execute( """INSERT INTO tracks (id, title, artist, permalink_url, artwork_url, duration_ms, license, liked_at, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET title=excluded.title, artist=excluded.artist, permalink_url=excluded.permalink_url, artwork_url=excluded.artwork_url, duration_ms=excluded.duration_ms, license=excluded.license, liked_at=excluded.liked_at, raw_json=excluded.raw_json """, ( track.id, track.title, track.artist, track.permalink_url, track.artwork_url, track.duration_ms, track.license, track.liked_at.isoformat(), track.raw_json, ), ) conn.commit() conn.close() def get_track(self, track_id: int) -> Track | None: conn = self._connect() row = conn.execute("SELECT * FROM tracks WHERE id = ?", (track_id,)).fetchone() conn.close() if row is None: return None return Track( id=row["id"], title=row["title"], artist=row["artist"], permalink_url=row["permalink_url"], artwork_url=row["artwork_url"], duration_ms=row["duration_ms"], license=row["license"], liked_at=datetime.fromisoformat(row["liked_at"]), raw_json=row["raw_json"], ) def get_or_create_show(self, week_start: datetime, week_end: datetime) -> Show: conn = self._connect() start_iso = week_start.isoformat() end_iso = week_end.isoformat() row = conn.execute( "SELECT * FROM shows WHERE week_start = ? AND week_end = ?", (start_iso, end_iso), ).fetchone() if row is not None: conn.close() return Show( id=row["id"], week_start=datetime.fromisoformat(row["week_start"]), week_end=datetime.fromisoformat(row["week_end"]), created_at=datetime.fromisoformat(row["created_at"]), ) now_iso = datetime.now(timezone.utc).isoformat() cursor = conn.execute( "INSERT INTO shows (week_start, week_end, created_at) VALUES (?, ?, ?)", (start_iso, end_iso, now_iso), ) conn.commit() show = Show( id=cursor.lastrowid, week_start=week_start, week_end=week_end, created_at=datetime.fromisoformat(now_iso), ) conn.close() return show def get_show_tracks(self, show_id: int) -> list[dict]: conn = self._connect() rows = conn.execute( """SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url, t.artwork_url, t.duration_ms, t.liked_at FROM show_tracks st JOIN tracks t ON st.track_id = t.id WHERE st.show_id = ? ORDER BY st.position""", (show_id,), ).fetchall() conn.close() return [dict(row) for row in rows] def get_show_track_by_position(self, show_id: int, position: int) -> dict | None: conn = self._connect() row = conn.execute( """SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url, t.artwork_url, t.duration_ms, t.liked_at FROM show_tracks st JOIN tracks t ON st.track_id = t.id WHERE st.show_id = ? AND st.position = ?""", (show_id, position), ).fetchone() conn.close() return dict(row) if row else None def set_show_tracks(self, show_id: int, track_ids: list[int]) -> None: """Set show tracks, preserving positions of already-assigned tracks.""" conn = self._connect() existing = conn.execute( "SELECT track_id, position FROM show_tracks WHERE show_id = ? ORDER BY position", (show_id,), ).fetchall() existing_map = {row["track_id"]: row["position"] for row in existing} max_pos = max(existing_map.values()) if existing_map else 0 for track_id in track_ids: if track_id not in existing_map: max_pos += 1 conn.execute( "INSERT OR IGNORE INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)", (show_id, track_id, max_pos), ) conn.commit() conn.close() def get_max_position(self, show_id: int) -> int: conn = self._connect() row = conn.execute( "SELECT COALESCE(MAX(position), 0) as max_pos FROM show_tracks WHERE show_id = ?", (show_id,), ).fetchone() conn.close() return row["max_pos"] def list_shows(self, limit: int = 20, offset: int = 0) -> list[Show]: conn = self._connect() rows = conn.execute( "SELECT * FROM shows ORDER BY week_start DESC LIMIT ? OFFSET ?", (limit, offset), ).fetchall() conn.close() return [ Show( id=row["id"], week_start=datetime.fromisoformat(row["week_start"]), week_end=datetime.fromisoformat(row["week_end"]), created_at=datetime.fromisoformat(row["created_at"]), ) for row in rows ] def has_track_in_show(self, show_id: int, track_id: int) -> bool: conn = self._connect() row = conn.execute( "SELECT 1 FROM show_tracks WHERE show_id = ? AND track_id = ?", (show_id, track_id), ).fetchone() conn.close() return row is not None def remove_show_track(self, show_id: int, track_id: int) -> None: conn = self._connect() conn.execute( "DELETE FROM show_tracks WHERE show_id = ? AND track_id = ?", (show_id, track_id), ) # Re-compact positions rows = conn.execute( "SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position", (show_id,), ).fetchall() for i, row in enumerate(rows, start=1): conn.execute( "UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?", (i, show_id, row["track_id"]), ) conn.commit() conn.close() def move_show_track(self, show_id: int, track_id: int, new_position: int) -> None: conn = self._connect() # Get current ordered list rows = conn.execute( "SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position", (show_id,), ).fetchall() ordered = [row["track_id"] for row in rows] # Remove and reinsert at new position ordered.remove(track_id) ordered.insert(new_position - 1, track_id) for i, tid in enumerate(ordered, start=1): conn.execute( "UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?", (i, show_id, tid), ) conn.commit() conn.close() def add_track_to_show(self, show_id: int, track_id: int, position: int | None = None) -> None: conn = self._connect() if position is None: max_pos = self.get_max_position(show_id) conn.execute( "INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)", (show_id, track_id, max_pos + 1), ) else: # Shift existing tracks at >= position up by 1 conn.execute( "UPDATE show_tracks SET position = position + 1 WHERE show_id = ? AND position >= ?", (show_id, position), ) conn.execute( "INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)", (show_id, track_id, position), ) conn.commit() conn.close() ``` Note: `from datetime import datetime, timezone` and `from ntr_fetcher.models import Track, Show, ShowTrack` are already imported at the top of `db.py`. **Step 4: Run tests to verify they pass** Run: `pytest tests/test_db.py -v` Expected: All tests pass. **Step 5: Commit** ```bash git add src/ntr_fetcher/db.py tests/test_db.py git commit -m "feat: add database query methods for tracks, shows, and show_tracks" ``` --- ## Task 7: SoundCloud Client — `client_id` Extraction **Files:** - Create: `src/ntr_fetcher/soundcloud.py` - Create: `tests/test_soundcloud.py` **Step 1: Write the failing test** ```python # tests/test_soundcloud.py import pytest import httpx from ntr_fetcher.soundcloud import SoundCloudClient FAKE_HTML = """ """ FAKE_HTML_EXPIRING = """ """ @pytest.mark.asyncio async def test_extract_client_id(httpx_mock): httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) client = SoundCloudClient() client_id = await client._extract_client_id() assert client_id == "test_client_id_abc123" @pytest.mark.asyncio async def test_extract_client_id_caches(httpx_mock): httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) client = SoundCloudClient() id1 = await client._extract_client_id() id2 = await client._extract_client_id() assert id1 == id2 assert len(httpx_mock.get_requests()) == 1 # Only fetched once @pytest.mark.asyncio async def test_extract_client_id_bad_html(httpx_mock): httpx_mock.add_response(url="https://soundcloud.com", text="no hydration here") client = SoundCloudClient() with pytest.raises(ValueError, match="client_id"): await client._extract_client_id() ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_soundcloud.py -v` Expected: FAIL — `ModuleNotFoundError` **Step 3: Write the implementation** ```python # src/ntr_fetcher/soundcloud.py import json import logging import re import httpx logger = logging.getLogger(__name__) SOUNDCLOUD_BASE = "https://soundcloud.com" API_BASE = "https://api-v2.soundcloud.com" HYDRATION_PATTERN = re.compile(r"__sc_hydration\s*=\s*(\[.*?\])\s*;", re.DOTALL) class SoundCloudClient: def __init__(self, http_client: httpx.AsyncClient | None = None): self._http = http_client or httpx.AsyncClient(timeout=15.0) self._client_id: str | None = None async def _extract_client_id(self) -> str: if self._client_id is not None: return self._client_id resp = await self._http.get(SOUNDCLOUD_BASE) resp.raise_for_status() match = HYDRATION_PATTERN.search(resp.text) if not match: raise ValueError("Could not find __sc_hydration in SoundCloud HTML — cannot extract client_id") hydration = json.loads(match.group(1)) for entry in hydration: if entry.get("hydratable") == "apiClient": self._client_id = entry["data"]["id"] is_expiring = entry["data"].get("isExpiring", False) if is_expiring: logger.warning("SoundCloud client_id is marked as expiring") return self._client_id raise ValueError("No apiClient entry in __sc_hydration — cannot extract client_id") def invalidate_client_id(self) -> None: self._client_id = None async def close(self) -> None: await self._http.aclose() ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_soundcloud.py -v` Expected: 3 passed. **Step 5: Commit** ```bash git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py git commit -m "feat: add SoundCloud client with client_id extraction" ``` --- ## Task 8: SoundCloud Client — Resolve User & Fetch Likes **Files:** - Modify: `src/ntr_fetcher/soundcloud.py` - Modify: `tests/test_soundcloud.py` **Step 1: Write the failing tests** Append to `tests/test_soundcloud.py`: ```python import json as json_mod from datetime import datetime, timezone from ntr_fetcher.models import Track FAKE_RESOLVE_RESPONSE = {"id": 206979918, "kind": "user", "username": "NICKtheRAT"} FAKE_LIKES_RESPONSE = { "collection": [ { "created_at": "2026-03-09T02:25:43Z", "kind": "like", "track": { "id": 12345, "title": "Test Track", "permalink_url": "https://soundcloud.com/artist/test-track", "duration": 180000, "full_duration": 180000, "genre": "Electronic", "tag_list": "", "created_at": "2026-03-01T00:00:00Z", "description": "", "artwork_url": "https://i1.sndcdn.com/artworks-abc-large.jpg", "license": "cc-by", "user": { "id": 999, "username": "TestArtist", "permalink_url": "https://soundcloud.com/testartist", }, "media": {"transcodings": []}, }, } ], "next_href": None, } @pytest.mark.asyncio async def test_resolve_user(httpx_mock): httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) httpx_mock.add_response( url=re.compile(r"https://api-v2\.soundcloud\.com/resolve.*"), json=FAKE_RESOLVE_RESPONSE, ) client = SoundCloudClient() user_id = await client.resolve_user("nicktherat") assert user_id == 206979918 @pytest.mark.asyncio async def test_fetch_likes(httpx_mock): httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) httpx_mock.add_response( url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), json=FAKE_LIKES_RESPONSE, ) client = SoundCloudClient() tracks = await client.fetch_likes( user_id=206979918, since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc), until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc), ) assert len(tracks) == 1 assert tracks[0].title == "Test Track" assert tracks[0].artist == "TestArtist" assert tracks[0].id == 12345 @pytest.mark.asyncio async def test_fetch_likes_filters_outside_range(httpx_mock): httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) httpx_mock.add_response( url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), json=FAKE_LIKES_RESPONSE, ) client = SoundCloudClient() # Range that excludes the track (liked at 2026-03-09) tracks = await client.fetch_likes( user_id=206979918, since=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc), until=datetime(2026, 3, 12, 0, 0, 0, tzinfo=timezone.utc), ) assert len(tracks) == 0 @pytest.mark.asyncio async def test_fetch_likes_retries_on_401(httpx_mock): """On 401, client_id is refreshed and the request is retried.""" httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML) # First likes call returns 401 httpx_mock.add_response( url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), status_code=401, ) # Re-extraction gets new client_id httpx_mock.add_response( url="https://soundcloud.com", text=FAKE_HTML.replace("test_client_id_abc123", "new_client_id_456"), ) # Retry succeeds httpx_mock.add_response( url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"), json=FAKE_LIKES_RESPONSE, ) client = SoundCloudClient() tracks = await client.fetch_likes( user_id=206979918, since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc), until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc), ) assert len(tracks) == 1 ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_soundcloud.py -v` Expected: FAIL — `AttributeError: 'SoundCloudClient' object has no attribute 'resolve_user'` **Step 3: Write the implementation** Add to `SoundCloudClient` in `src/ntr_fetcher/soundcloud.py`: ```python async def _api_get(self, url: str, params: dict | None = None) -> httpx.Response: """Make an API GET request with automatic client_id injection and 401 retry.""" client_id = await self._extract_client_id() params = dict(params or {}) params["client_id"] = client_id for attempt in range(3): resp = await self._http.get(url, params=params) if resp.status_code == 401: logger.warning("Got 401 from SoundCloud API, refreshing client_id (attempt %d)", attempt + 1) self.invalidate_client_id() client_id = await self._extract_client_id() params["client_id"] = client_id continue resp.raise_for_status() return resp raise httpx.HTTPStatusError( "Failed after 3 attempts (401)", request=resp.request, response=resp, ) async def resolve_user(self, username: str) -> int: resp = await self._api_get( f"{API_BASE}/resolve", params={"url": f"{SOUNDCLOUD_BASE}/{username}"}, ) return resp.json()["id"] async def fetch_likes( self, user_id: int, since: datetime, until: datetime, limit: int = 50, ) -> list["Track"]: from ntr_fetcher.models import Track cursor = _build_cursor(until, user_id) collected: list[Track] = [] while True: params = {"limit": limit} if cursor: params["offset"] = cursor resp = await self._api_get(f"{API_BASE}/users/{user_id}/likes", params=params) data = resp.json() collection = data.get("collection", []) if not collection: break stop = False for item in collection: liked_at_str = item.get("created_at", "") liked_at = datetime.fromisoformat(liked_at_str.replace("Z", "+00:00")) if liked_at < since: stop = True break if liked_at > until: continue track_data = item.get("track") if track_data is None: continue user_data = track_data.get("user", {}) collected.append( Track( id=track_data["id"], title=track_data["title"], artist=user_data.get("username", "Unknown"), permalink_url=track_data["permalink_url"], artwork_url=track_data.get("artwork_url"), duration_ms=track_data.get("full_duration", track_data.get("duration", 0)), license=track_data.get("license", ""), liked_at=liked_at, raw_json=json.dumps(track_data), ) ) if stop: break next_href = data.get("next_href") if not next_href: break # Extract cursor from next_href from urllib.parse import urlparse, parse_qs parsed = urlparse(next_href) qs = parse_qs(parsed.query) cursor = qs.get("offset", [None])[0] if cursor is None: break # Return in chronological order (oldest first) collected.sort(key=lambda t: t.liked_at) return collected ``` Also add this module-level helper and the missing import at the top of the file: ```python from datetime import datetime from urllib.parse import quote def _build_cursor(until: datetime, user_id: int) -> str: ts = until.strftime("%Y-%m-%dT%H:%M:%S.000Z") padded_user = str(user_id).zfill(22) raw = f"{ts},user-track-likes,000-{padded_user}-99999999999999999999" return raw ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_soundcloud.py -v` Expected: All tests pass. **Step 5: Commit** ```bash git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py git commit -m "feat: add user resolution and likes fetching with 401 retry" ``` --- ## Task 9: Poller & Supervisor **Files:** - Create: `src/ntr_fetcher/poller.py` - Create: `tests/test_poller.py` **Step 1: Write the failing test** ```python # tests/test_poller.py import asyncio from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest from ntr_fetcher.poller import Poller from ntr_fetcher.models import Track def _make_track(id: int, liked_at: str) -> Track: return Track( id=id, title=f"Track {id}", artist="Artist", permalink_url=f"https://soundcloud.com/a/t-{id}", artwork_url=None, duration_ms=180000, license="cc-by", liked_at=datetime.fromisoformat(liked_at), raw_json="{}", ) @pytest.mark.asyncio async def test_poll_once_fetches_and_stores(): mock_sc = AsyncMock() mock_sc.resolve_user.return_value = 206979918 mock_sc.fetch_likes.return_value = [ _make_track(1, "2026-03-14T01:00:00+00:00"), _make_track(2, "2026-03-14T02:00:00+00:00"), ] mock_db = MagicMock() mock_show = MagicMock() mock_show.id = 1 mock_db.get_or_create_show.return_value = mock_show poller = Poller( db=mock_db, soundcloud=mock_sc, soundcloud_user="nicktherat", show_day=2, show_hour=22, poll_interval=3600, ) await poller.poll_once() assert mock_sc.resolve_user.called assert mock_sc.fetch_likes.called assert mock_db.upsert_track.call_count == 2 assert mock_db.set_show_tracks.called call_args = mock_db.set_show_tracks.call_args assert call_args[0][0] == 1 # show_id assert call_args[0][1] == [1, 2] # track_ids in order @pytest.mark.asyncio async def test_poll_once_full_refresh(): mock_sc = AsyncMock() mock_sc.resolve_user.return_value = 206979918 mock_sc.fetch_likes.return_value = [ _make_track(1, "2026-03-14T01:00:00+00:00"), ] mock_db = MagicMock() mock_show = MagicMock() mock_show.id = 1 mock_db.get_or_create_show.return_value = mock_show poller = Poller( db=mock_db, soundcloud=mock_sc, soundcloud_user="nicktherat", show_day=2, show_hour=22, poll_interval=3600, ) await poller.poll_once(full=True) # full=True should still call fetch_likes assert mock_sc.fetch_likes.called @pytest.mark.asyncio async def test_supervisor_restarts_poller_on_failure(): call_count = 0 async def failing_poll(): nonlocal call_count call_count += 1 if call_count <= 2: raise RuntimeError("Simulated failure") mock_sc = AsyncMock() mock_db = MagicMock() poller = Poller( db=mock_db, soundcloud=mock_sc, soundcloud_user="nicktherat", show_day=2, show_hour=22, poll_interval=0.01, # Very short for testing ) poller.poll_once = failing_poll task = asyncio.create_task(poller.run_supervised(restart_delay=0.01)) await asyncio.sleep(0.2) task.cancel() try: await task except asyncio.CancelledError: pass assert call_count >= 3 # Failed twice, succeeded at least once ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_poller.py -v` Expected: FAIL — `ModuleNotFoundError` **Step 3: Write the implementation** ```python # src/ntr_fetcher/poller.py import asyncio import logging from datetime import datetime, timezone from ntr_fetcher.db import Database from ntr_fetcher.soundcloud import SoundCloudClient from ntr_fetcher.week import get_show_week logger = logging.getLogger(__name__) class Poller: def __init__( self, db: Database, soundcloud: SoundCloudClient, soundcloud_user: str, show_day: int, show_hour: int, poll_interval: float, ): self._db = db self._sc = soundcloud self._user = soundcloud_user self._show_day = show_day self._show_hour = show_hour self._poll_interval = poll_interval self._user_id: int | None = None self.last_fetch: datetime | None = None self.alive = True async def _get_user_id(self) -> int: if self._user_id is None: self._user_id = await self._sc.resolve_user(self._user) return self._user_id async def poll_once(self, full: bool = False) -> None: user_id = await self._get_user_id() now = datetime.now(timezone.utc) week_start, week_end = get_show_week(now, self._show_day, self._show_hour) show = self._db.get_or_create_show(week_start, week_end) tracks = await self._sc.fetch_likes( user_id=user_id, since=week_start, until=week_end, ) for track in tracks: self._db.upsert_track(track) track_ids = [t.id for t in tracks] self._db.set_show_tracks(show.id, track_ids) self.last_fetch = datetime.now(timezone.utc) logger.info("Fetched %d tracks for show %d", len(tracks), show.id) async def run_supervised(self, restart_delay: float = 30.0) -> None: while True: try: self.alive = True await self.poll_once() await asyncio.sleep(self._poll_interval) except asyncio.CancelledError: self.alive = False raise except Exception: self.alive = False logger.exception("Poller failed, restarting in %.1fs", restart_delay) await asyncio.sleep(restart_delay) ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_poller.py -v` Expected: 3 passed. **Step 5: Commit** ```bash git add src/ntr_fetcher/poller.py tests/test_poller.py git commit -m "feat: add poller with supervised restart loop" ``` --- ## Task 10: API Routes **Files:** - Create: `src/ntr_fetcher/api.py` - Create: `tests/test_api.py` **Step 1: Write the failing tests** ```python # tests/test_api.py from datetime import datetime, timezone import pytest from fastapi.testclient import TestClient from ntr_fetcher.api import create_app from ntr_fetcher.db import Database from ntr_fetcher.models import Track @pytest.fixture def db(tmp_path): database = Database(str(tmp_path / "test.db")) database.initialize() return database @pytest.fixture def app(db): from unittest.mock import AsyncMock, MagicMock poller = MagicMock() poller.last_fetch = datetime(2026, 3, 12, 12, 0, 0, tzinfo=timezone.utc) poller.alive = True poller.poll_once = AsyncMock() return create_app(db=db, poller=poller, admin_token="test-token") @pytest.fixture def client(app): return TestClient(app) def _seed_show(db): week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc) week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc) show = db.get_or_create_show(week_start, week_end) t1 = Track(1, "Song A", "Artist A", "https://soundcloud.com/a/1", None, 180000, "cc-by", datetime(2026, 3, 14, 1, 0, 0, tzinfo=timezone.utc), "{}") t2 = Track(2, "Song B", "Artist B", "https://soundcloud.com/b/2", None, 200000, "cc-by-sa", datetime(2026, 3, 14, 2, 0, 0, tzinfo=timezone.utc), "{}") db.upsert_track(t1) db.upsert_track(t2) db.set_show_tracks(show.id, [t1.id, t2.id]) return show def test_health(client): resp = client.get("/health") assert resp.status_code == 200 data = resp.json() assert data["status"] == "ok" assert data["poller_alive"] is True def test_playlist(client, db): _seed_show(db) resp = client.get("/playlist") assert resp.status_code == 200 data = resp.json() assert len(data["tracks"]) == 2 assert data["tracks"][0]["position"] == 1 assert data["tracks"][0]["title"] == "Song A" def test_playlist_by_position(client, db): _seed_show(db) resp = client.get("/playlist/2") assert resp.status_code == 200 assert resp.json()["title"] == "Song B" def test_playlist_by_position_not_found(client, db): _seed_show(db) resp = client.get("/playlist/99") assert resp.status_code == 404 def test_shows_list(client, db): _seed_show(db) resp = client.get("/shows") assert resp.status_code == 200 assert len(resp.json()) >= 1 def test_shows_detail(client, db): show = _seed_show(db) resp = client.get(f"/shows/{show.id}") assert resp.status_code == 200 assert len(resp.json()["tracks"]) == 2 def test_admin_refresh_requires_token(client): resp = client.post("/admin/refresh") assert resp.status_code == 401 def test_admin_refresh_with_token(client): resp = client.post( "/admin/refresh", headers={"Authorization": "Bearer test-token"}, json={"full": False}, ) assert resp.status_code == 200 def test_admin_remove_track(client, db): show = _seed_show(db) resp = client.delete( "/admin/tracks/1", headers={"Authorization": "Bearer test-token"}, ) assert resp.status_code == 200 tracks = db.get_show_tracks(show.id) assert len(tracks) == 1 ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_api.py -v` Expected: FAIL — `ModuleNotFoundError` **Step 3: Write the implementation** ```python # src/ntr_fetcher/api.py import logging from datetime import datetime, timezone from fastapi import FastAPI, HTTPException, Depends, Header from pydantic import BaseModel from ntr_fetcher.db import Database from ntr_fetcher.week import get_show_week logger = logging.getLogger(__name__) class RefreshRequest(BaseModel): full: bool = False class AddTrackRequest(BaseModel): soundcloud_url: str | None = None track_id: int | None = None position: int | None = None class MoveTrackRequest(BaseModel): position: int def create_app(db: Database, poller, admin_token: str) -> FastAPI: app = FastAPI(title="NtR SoundCloud Fetcher") def _require_admin(authorization: str = Header(None)): if authorization is None or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid token") token = authorization.removeprefix("Bearer ") if token != admin_token: raise HTTPException(status_code=401, detail="Invalid token") def _current_show(): now = datetime.now(timezone.utc) week_start, week_end = get_show_week(now, show_day=2, show_hour=22) return db.get_or_create_show(week_start, week_end) @app.get("/health") def health(): show = _current_show() tracks = db.get_show_tracks(show.id) return { "status": "ok", "poller_alive": poller.alive, "last_fetch": poller.last_fetch.isoformat() if poller.last_fetch else None, "current_week_track_count": len(tracks), } @app.get("/playlist") def playlist(): show = _current_show() tracks = db.get_show_tracks(show.id) return { "show_id": show.id, "week_start": show.week_start.isoformat(), "week_end": show.week_end.isoformat(), "tracks": tracks, } @app.get("/playlist/{position}") def playlist_track(position: int): show = _current_show() track = db.get_show_track_by_position(show.id, position) if track is None: raise HTTPException(status_code=404, detail=f"No track at position {position}") return track @app.get("/shows") def list_shows(limit: int = 20, offset: int = 0): shows = db.list_shows(limit=limit, offset=offset) return [ { "id": s.id, "week_start": s.week_start.isoformat(), "week_end": s.week_end.isoformat(), "created_at": s.created_at.isoformat(), } for s in shows ] @app.get("/shows/{show_id}") def show_detail(show_id: int): shows = db.list_shows(limit=1000, offset=0) show = next((s for s in shows if s.id == show_id), None) if show is None: raise HTTPException(status_code=404, detail="Show not found") tracks = db.get_show_tracks(show.id) return { "show_id": show.id, "week_start": show.week_start.isoformat(), "week_end": show.week_end.isoformat(), "tracks": tracks, } @app.post("/admin/refresh") async def admin_refresh(body: RefreshRequest = RefreshRequest(), _=Depends(_require_admin)): await poller.poll_once(full=body.full) show = _current_show() tracks = db.get_show_tracks(show.id) return {"status": "refreshed", "track_count": len(tracks)} @app.post("/admin/tracks") def admin_add_track(body: AddTrackRequest, _=Depends(_require_admin)): show = _current_show() if body.track_id is not None: db.add_track_to_show(show.id, body.track_id, body.position) return {"status": "added"} raise HTTPException(status_code=400, detail="Provide track_id") @app.delete("/admin/tracks/{track_id}") def admin_remove_track(track_id: int, _=Depends(_require_admin)): show = _current_show() if not db.has_track_in_show(show.id, track_id): raise HTTPException(status_code=404, detail="Track not in current show") db.remove_show_track(show.id, track_id) return {"status": "removed"} @app.put("/admin/tracks/{track_id}/position") def admin_move_track(track_id: int, body: MoveTrackRequest, _=Depends(_require_admin)): show = _current_show() if not db.has_track_in_show(show.id, track_id): raise HTTPException(status_code=404, detail="Track not in current show") db.move_show_track(show.id, track_id, body.position) return {"status": "moved"} return app ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_api.py -v` Expected: All tests pass. **Step 5: Commit** ```bash git add src/ntr_fetcher/api.py tests/test_api.py git commit -m "feat: add FastAPI routes for playlist, shows, admin, and health" ``` --- ## Task 11: Main Entry Point **Files:** - Create: `src/ntr_fetcher/main.py` **Step 1: Write the entry point** ```python # src/ntr_fetcher/main.py import asyncio import logging import uvicorn from ntr_fetcher.api import create_app from ntr_fetcher.config import Settings from ntr_fetcher.db import Database from ntr_fetcher.poller import Poller from ntr_fetcher.soundcloud import SoundCloudClient logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) logger = logging.getLogger(__name__) def run() -> None: settings = Settings() db = Database(settings.db_path) db.initialize() logger.info("Database initialized at %s", settings.db_path) sc = SoundCloudClient() poller = Poller( db=db, soundcloud=sc, soundcloud_user=settings.soundcloud_user, show_day=settings.show_day, show_hour=settings.show_hour, poll_interval=settings.poll_interval_seconds, ) app = create_app(db=db, poller=poller, admin_token=settings.admin_token) @app.on_event("startup") async def start_poller(): logger.info("Starting poller (interval=%ds)", settings.poll_interval_seconds) asyncio.create_task(poller.run_supervised()) @app.on_event("shutdown") async def shutdown(): logger.info("Shutting down") await sc.close() uvicorn.run(app, host=settings.host, port=settings.port) if __name__ == "__main__": run() ``` **Step 2: Verify it at least imports** Run: `python -c "from ntr_fetcher.main import run; print('OK')"` Expected: `OK` **Step 3: Commit** ```bash git add src/ntr_fetcher/main.py git commit -m "feat: add main entry point wiring API server and poller" ``` --- ## Task 12: README **Files:** - Create: `README.md` **Step 1: Write the README** ```markdown # NtR SoundCloud Fetcher Fetches SoundCloud likes from NicktheRat's profile, builds weekly playlists aligned to the Wednesday 22:00 ET show schedule, and serves them via a JSON API. ## Quick Start ```bash pip install -e ".[dev]" export NTR_ADMIN_TOKEN="your-secret-here" ntr-fetcher ``` The API starts at `http://127.0.0.1:8000`. ## API | Endpoint | Description | |----------|-------------| | `GET /playlist` | Current week's playlist | | `GET /playlist/{n}` | Track at position n | | `GET /shows` | List all shows | | `GET /shows/{id}` | Specific show's playlist | | `GET /health` | Service health check | | `POST /admin/refresh` | Trigger SoundCloud fetch (token required) | ## Configuration Environment variables (prefix `NTR_`): | Variable | Default | Description | |----------|---------|-------------| | `NTR_PORT` | `8000` | API port | | `NTR_HOST` | `127.0.0.1` | Bind address | | `NTR_DB_PATH` | `./ntr_fetcher.db` | SQLite path | | `NTR_POLL_INTERVAL_SECONDS` | `3600` | Poll frequency | | `NTR_ADMIN_TOKEN` | (required) | Admin bearer token | | `NTR_SOUNDCLOUD_USER` | `nicktherat` | SoundCloud user | ## Development ```bash pip install -e ".[dev]" pytest ``` ``` **Step 2: Commit** ```bash git add README.md git commit -m "docs: add README with quick start and API reference" ``` --- ## Task 13: Run Full Test Suite **Step 1: Run all tests** Run: `pytest -v` Expected: All tests pass. **Step 2: Run ruff linter** Run: `ruff check src/ tests/` Expected: No errors (or fix any that appear). **Step 3: Final commit if any fixes were needed** ```bash git add -A git commit -m "fix: address linting issues" ```