Files
NtR-soudcloud-fetcher/docs/plans/2026-03-12-ntr-fetcher-implementation.md

2111 lines
61 KiB
Markdown
Raw Normal View History

# NtR SoundCloud Fetcher Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Build a Python service that polls NicktheRat's SoundCloud likes, builds weekly playlists, and serves them via a JSON API.
**Architecture:** Single async Python process — FastAPI serves the JSON API, an async background task polls SoundCloud hourly, and a supervisor loop restarts the poller on failure. SQLite stores all state. See `docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md` for the full design.
**Tech Stack:** Python 3.11+, FastAPI, uvicorn, httpx, pydantic, sqlite3, zoneinfo.
**Reference docs:**
- Design: `docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md`
- SoundCloud API: `docs/soundcloud-likes-api.md`
---
## Task 1: Project Scaffolding
**Files:**
- Create: `pyproject.toml`
- Create: `src/ntr_fetcher/__init__.py`
- Create: `tests/__init__.py`
- Create: `tests/conftest.py`
**Step 1: Create `pyproject.toml`**
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "ntr-fetcher"
version = "0.1.0"
description = "SoundCloud likes fetcher for Nick the Rat Radio"
requires-python = ">=3.11"
dependencies = [
"fastapi",
"uvicorn[standard]",
"httpx",
"pydantic-settings",
]
[project.optional-dependencies]
dev = [
"pytest",
"pytest-asyncio",
"pytest-httpx",
"ruff",
]
[project.scripts]
ntr-fetcher = "ntr_fetcher.main:run"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[tool.ruff]
target-version = "py311"
src = ["src"]
```
**Step 2: Create package init**
```python
# src/ntr_fetcher/__init__.py
```
(Empty file — just marks the package.)
**Step 3: Create test scaffolding**
```python
# tests/__init__.py
```
```python
# tests/conftest.py
import pytest
```
**Step 4: Install the project in dev mode**
Run: `pip install -e ".[dev]"`
Expected: Successfully installed with all dependencies.
**Step 5: Verify pytest runs**
Run: `pytest --co`
Expected: "no tests ran" (collected 0 items), exit code 0 or 5 (no tests found).
**Step 6: Commit**
```bash
git add pyproject.toml src/ tests/
git commit -m "scaffold: project structure with pyproject.toml and test config"
```
---
## Task 2: Configuration Module
**Files:**
- Create: `src/ntr_fetcher/config.py`
- Create: `tests/test_config.py`
**Step 1: Write the failing test**
```python
# tests/test_config.py
import os
from ntr_fetcher.config import Settings
def test_settings_defaults():
settings = Settings(admin_token="test-secret")
assert settings.port == 8000
assert settings.host == "127.0.0.1"
assert settings.db_path == "./ntr_fetcher.db"
assert settings.poll_interval_seconds == 3600
assert settings.soundcloud_user == "nicktherat"
assert settings.show_day == 2
assert settings.show_hour == 22
def test_settings_from_env(monkeypatch):
monkeypatch.setenv("NTR_PORT", "9090")
monkeypatch.setenv("NTR_HOST", "0.0.0.0")
monkeypatch.setenv("NTR_ADMIN_TOKEN", "my-secret")
monkeypatch.setenv("NTR_SOUNDCLOUD_USER", "someoneelse")
settings = Settings()
assert settings.port == 9090
assert settings.host == "0.0.0.0"
assert settings.admin_token == "my-secret"
assert settings.soundcloud_user == "someoneelse"
def test_settings_admin_token_required():
import pytest
with pytest.raises(Exception):
Settings()
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_config.py -v`
Expected: FAIL — `ModuleNotFoundError: No module named 'ntr_fetcher.config'`
**Step 3: Write the implementation**
```python
# src/ntr_fetcher/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
model_config = {"env_prefix": "NTR_"}
port: int = 8000
host: str = "127.0.0.1"
db_path: str = "./ntr_fetcher.db"
poll_interval_seconds: int = 3600
admin_token: str
soundcloud_user: str = "nicktherat"
show_day: int = 2
show_hour: int = 22
```
**Step 4: Run test to verify it passes**
Run: `pytest tests/test_config.py -v`
Expected: 3 passed.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/config.py tests/test_config.py
git commit -m "feat: add configuration module with pydantic-settings"
```
---
## Task 3: Week Boundary Computation
**Files:**
- Create: `src/ntr_fetcher/week.py`
- Create: `tests/test_week.py`
This module computes the Wednesday 22:00 Eastern boundaries in UTC. Must handle EST/EDT transitions correctly.
**Step 1: Write the failing tests**
```python
# tests/test_week.py
from datetime import datetime, timezone
from ntr_fetcher.week import get_show_week, Show_DAY_DEFAULT, SHOW_HOUR_DEFAULT
def test_mid_week_thursday():
"""Thursday should belong to the show that started the previous Wednesday."""
# Thursday March 13, 2026 15:00 UTC (11:00 AM EDT)
now = datetime(2026, 3, 13, 15, 0, 0, tzinfo=timezone.utc)
start, end = get_show_week(now, show_day=2, show_hour=22)
# Show started Wed March 12 22:00 EDT = March 13 02:00 UTC (DST active since March 8)
assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
# Show ends next Wed March 19 22:00 EDT = March 20 02:00 UTC
assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
def test_wednesday_before_show():
"""Wednesday before 22:00 ET belongs to the previous week's show."""
# Wednesday March 12, 2026 20:00 UTC (16:00 EDT — before 22:00 EDT)
now = datetime(2026, 3, 12, 20, 0, 0, tzinfo=timezone.utc)
start, end = get_show_week(now, show_day=2, show_hour=22)
# Previous Wed was March 5 22:00 EST = March 6 03:00 UTC (before DST)
assert start == datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc)
# Ends this Wed March 12 22:00 EDT = March 13 02:00 UTC (after DST)
assert end == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
def test_wednesday_after_show_starts():
"""Wednesday at or after 22:00 ET belongs to the new week."""
# Wednesday March 12, 2026 at 22:30 EDT = March 13 02:30 UTC
now = datetime(2026, 3, 13, 2, 30, 0, tzinfo=timezone.utc)
start, end = get_show_week(now, show_day=2, show_hour=22)
assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
def test_est_period_no_dst():
"""January — firmly in EST (UTC-5)."""
# Thursday Jan 15, 2026 12:00 UTC
now = datetime(2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
start, end = get_show_week(now, show_day=2, show_hour=22)
# Wed Jan 14 22:00 EST = Jan 15 03:00 UTC
assert start == datetime(2026, 1, 15, 3, 0, 0, tzinfo=timezone.utc)
# Wed Jan 21 22:00 EST = Jan 22 03:00 UTC
assert end == datetime(2026, 1, 22, 3, 0, 0, tzinfo=timezone.utc)
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_week.py -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Write the implementation**
```python
# src/ntr_fetcher/week.py
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
EASTERN = ZoneInfo("America/New_York")
Show_DAY_DEFAULT = 2 # Wednesday (Monday=0)
SHOW_HOUR_DEFAULT = 22
def get_show_week(
now_utc: datetime,
show_day: int = Show_DAY_DEFAULT,
show_hour: int = SHOW_HOUR_DEFAULT,
) -> tuple[datetime, datetime]:
"""Return (week_start_utc, week_end_utc) for the show week containing now_utc.
The week starts at show_day at show_hour Eastern Time and runs for 7 days.
"""
now_et = now_utc.astimezone(EASTERN)
# Find the most recent show_day at show_hour that is <= now
days_since_show_day = (now_et.weekday() - show_day) % 7
candidate_date = now_et.date() - timedelta(days=days_since_show_day)
candidate = datetime(
candidate_date.year,
candidate_date.month,
candidate_date.day,
show_hour,
0,
0,
tzinfo=EASTERN,
)
if candidate > now_et:
# We're on the show day but before the show hour — go back a week
candidate -= timedelta(days=7)
week_start_utc = candidate.astimezone(timezone.utc).replace(tzinfo=timezone.utc)
week_end_utc = (candidate + timedelta(days=7)).astimezone(timezone.utc).replace(tzinfo=timezone.utc)
return week_start_utc, week_end_utc
```
**Step 4: Run test to verify it passes**
Run: `pytest tests/test_week.py -v`
Expected: 4 passed.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/week.py tests/test_week.py
git commit -m "feat: add week boundary computation with DST handling"
```
---
## Task 4: Models
**Files:**
- Create: `src/ntr_fetcher/models.py`
Simple dataclasses for passing data between layers. No test needed — these are pure data containers validated by type checkers and used transitively in other tests.
**Step 1: Write the models**
```python
# src/ntr_fetcher/models.py
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True)
class Track:
id: int
title: str
artist: str
permalink_url: str
artwork_url: str | None
duration_ms: int
license: str
liked_at: datetime
raw_json: str
@dataclass(frozen=True)
class Show:
id: int
week_start: datetime
week_end: datetime
created_at: datetime
@dataclass(frozen=True)
class ShowTrack:
show_id: int
track_id: int
position: int
```
**Step 2: Commit**
```bash
git add src/ntr_fetcher/models.py
git commit -m "feat: add data models for Track, Show, ShowTrack"
```
---
## Task 5: Database Module — Schema & Migrations
**Files:**
- Create: `src/ntr_fetcher/db.py`
- Create: `tests/test_db.py`
This is a larger module. We'll build it in stages: schema first, then queries.
**Step 1: Write the failing test for schema creation**
```python
# tests/test_db.py
import sqlite3
import pytest
from ntr_fetcher.db import Database
@pytest.fixture
def db(tmp_path):
db_path = str(tmp_path / "test.db")
database = Database(db_path)
database.initialize()
return database
def test_tables_created(db):
conn = sqlite3.connect(db.path)
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = [row[0] for row in cursor.fetchall()]
conn.close()
assert "tracks" in tables
assert "shows" in tables
assert "show_tracks" in tables
def test_initialize_idempotent(db):
"""Calling initialize twice doesn't raise."""
db.initialize()
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_db.py::test_tables_created -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Write the schema initialization**
```python
# src/ntr_fetcher/db.py
import sqlite3
from datetime import datetime, timezone
from ntr_fetcher.models import Track, Show, ShowTrack
SCHEMA = """
CREATE TABLE IF NOT EXISTS tracks (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
artist TEXT NOT NULL,
permalink_url TEXT NOT NULL,
artwork_url TEXT,
duration_ms INTEGER NOT NULL,
license TEXT NOT NULL DEFAULT '',
liked_at TEXT NOT NULL,
raw_json TEXT NOT NULL DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS shows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
week_start TEXT NOT NULL,
week_end TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS show_tracks (
show_id INTEGER NOT NULL REFERENCES shows(id),
track_id INTEGER NOT NULL REFERENCES tracks(id),
position INTEGER NOT NULL,
UNIQUE(show_id, track_id)
);
CREATE INDEX IF NOT EXISTS idx_show_tracks_show_id ON show_tracks(show_id);
CREATE INDEX IF NOT EXISTS idx_tracks_liked_at ON tracks(liked_at);
"""
class Database:
def __init__(self, path: str):
self.path = path
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def initialize(self) -> None:
conn = self._connect()
conn.executescript(SCHEMA)
conn.close()
```
**Step 4: Run test to verify it passes**
Run: `pytest tests/test_db.py -v`
Expected: 2 passed.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/db.py tests/test_db.py
git commit -m "feat: add database module with schema initialization"
```
---
## Task 6: Database Module — Queries
**Files:**
- Modify: `src/ntr_fetcher/db.py`
- Modify: `tests/test_db.py`
Add methods for all the CRUD operations the service needs.
**Step 1: Write the failing tests**
Append to `tests/test_db.py`:
```python
from datetime import datetime, timezone
from ntr_fetcher.models import Track
def _make_track(id: int, liked_at: str, title: str = "Test", artist: str = "Artist") -> Track:
return Track(
id=id,
title=title,
artist=artist,
permalink_url=f"https://soundcloud.com/test/track-{id}",
artwork_url=None,
duration_ms=180000,
license="cc-by",
liked_at=datetime.fromisoformat(liked_at),
raw_json="{}",
)
def test_upsert_track(db):
track = _make_track(100, "2026-03-10T12:00:00+00:00")
db.upsert_track(track)
result = db.get_track(100)
assert result is not None
assert result.title == "Test"
def test_upsert_track_updates_existing(db):
track1 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Original")
db.upsert_track(track1)
track2 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Updated")
db.upsert_track(track2)
result = db.get_track(100)
assert result.title == "Updated"
def test_get_or_create_show(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
assert show.id is not None
assert show.week_start == week_start
# Calling again returns the same show
show2 = db.get_or_create_show(week_start, week_end)
assert show2.id == show.id
def test_set_show_tracks(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First")
t2 = _make_track(2, "2026-03-14T02:00:00+00:00", title="Second")
db.upsert_track(t1)
db.upsert_track(t2)
db.set_show_tracks(show.id, [t1.id, t2.id])
tracks = db.get_show_tracks(show.id)
assert len(tracks) == 2
assert tracks[0]["position"] == 1
assert tracks[0]["title"] == "First"
assert tracks[1]["position"] == 2
def test_set_show_tracks_preserves_existing_positions(db):
"""Adding a new track doesn't change existing track positions."""
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
db.upsert_track(t1)
db.set_show_tracks(show.id, [t1.id])
t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
db.upsert_track(t2)
db.set_show_tracks(show.id, [t1.id, t2.id])
tracks = db.get_show_tracks(show.id)
assert tracks[0]["track_id"] == 1
assert tracks[0]["position"] == 1
assert tracks[1]["track_id"] == 2
assert tracks[1]["position"] == 2
def test_set_show_tracks_removes_unliked(db):
"""Tracks no longer in the likes list are removed and positions re-compact."""
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First")
t2 = _make_track(2, "2026-03-14T02:00:00+00:00", title="Second")
t3 = _make_track(3, "2026-03-14T03:00:00+00:00", title="Third")
db.upsert_track(t1)
db.upsert_track(t2)
db.upsert_track(t3)
db.set_show_tracks(show.id, [t1.id, t2.id, t3.id])
# Nick unlikes track 2
db.set_show_tracks(show.id, [t1.id, t3.id])
tracks = db.get_show_tracks(show.id)
assert len(tracks) == 2
assert tracks[0]["track_id"] == 1
assert tracks[0]["position"] == 1
assert tracks[1]["track_id"] == 3
assert tracks[1]["position"] == 2 # re-compacted from 3
def test_get_show_track_by_position(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First")
db.upsert_track(t1)
db.set_show_tracks(show.id, [t1.id])
result = db.get_show_track_by_position(show.id, 1)
assert result is not None
assert result["title"] == "First"
result_missing = db.get_show_track_by_position(show.id, 99)
assert result_missing is None
def test_list_shows(db):
s1 = db.get_or_create_show(
datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc),
datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc),
)
s2 = db.get_or_create_show(
datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc),
datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc),
)
shows = db.list_shows(limit=10, offset=0)
assert len(shows) == 2
# Newest first
assert shows[0].id == s2.id
def test_max_position_for_show(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
assert db.get_max_position(show.id) == 0
t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
db.upsert_track(t1)
db.set_show_tracks(show.id, [t1.id])
assert db.get_max_position(show.id) == 1
def test_remove_show_track(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
t3 = _make_track(3, "2026-03-14T03:00:00+00:00")
db.upsert_track(t1)
db.upsert_track(t2)
db.upsert_track(t3)
db.set_show_tracks(show.id, [t1.id, t2.id, t3.id])
db.remove_show_track(show.id, 2)
tracks = db.get_show_tracks(show.id)
assert len(tracks) == 2
assert tracks[0]["position"] == 1
assert tracks[0]["track_id"] == 1
# Position re-compacted: track 3 moved from position 3 to 2
assert tracks[1]["position"] == 2
assert tracks[1]["track_id"] == 3
def test_move_show_track(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
t3 = _make_track(3, "2026-03-14T03:00:00+00:00")
db.upsert_track(t1)
db.upsert_track(t2)
db.upsert_track(t3)
db.set_show_tracks(show.id, [t1.id, t2.id, t3.id])
# Move track 3 from position 3 to position 1
db.move_show_track(show.id, track_id=3, new_position=1)
tracks = db.get_show_tracks(show.id)
assert tracks[0]["track_id"] == 3
assert tracks[0]["position"] == 1
assert tracks[1]["track_id"] == 1
assert tracks[1]["position"] == 2
assert tracks[2]["track_id"] == 2
assert tracks[2]["position"] == 3
def test_add_track_to_show_at_position(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
t3 = _make_track(3, "2026-03-14T03:00:00+00:00")
db.upsert_track(t1)
db.upsert_track(t2)
db.upsert_track(t3)
db.set_show_tracks(show.id, [t1.id, t2.id])
# Insert track 3 at position 2 — pushes track 2 to position 3
db.add_track_to_show(show.id, track_id=3, position=2)
tracks = db.get_show_tracks(show.id)
assert len(tracks) == 3
assert tracks[0]["track_id"] == 1
assert tracks[1]["track_id"] == 3
assert tracks[2]["track_id"] == 2
def test_has_track_in_show(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
db.upsert_track(t1)
db.set_show_tracks(show.id, [t1.id])
assert db.has_track_in_show(show.id, 1) is True
assert db.has_track_in_show(show.id, 999) is False
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_db.py -v`
Expected: FAIL — `AttributeError: 'Database' object has no attribute 'upsert_track'`
**Step 3: Write the query methods**
Add these methods to the `Database` class in `src/ntr_fetcher/db.py`:
```python
def upsert_track(self, track: Track) -> None:
conn = self._connect()
conn.execute(
"""INSERT INTO tracks (id, title, artist, permalink_url, artwork_url, duration_ms, license, liked_at, raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
title=excluded.title,
artist=excluded.artist,
permalink_url=excluded.permalink_url,
artwork_url=excluded.artwork_url,
duration_ms=excluded.duration_ms,
license=excluded.license,
liked_at=excluded.liked_at,
raw_json=excluded.raw_json
""",
(
track.id,
track.title,
track.artist,
track.permalink_url,
track.artwork_url,
track.duration_ms,
track.license,
track.liked_at.isoformat(),
track.raw_json,
),
)
conn.commit()
conn.close()
def get_track(self, track_id: int) -> Track | None:
conn = self._connect()
row = conn.execute("SELECT * FROM tracks WHERE id = ?", (track_id,)).fetchone()
conn.close()
if row is None:
return None
return Track(
id=row["id"],
title=row["title"],
artist=row["artist"],
permalink_url=row["permalink_url"],
artwork_url=row["artwork_url"],
duration_ms=row["duration_ms"],
license=row["license"],
liked_at=datetime.fromisoformat(row["liked_at"]),
raw_json=row["raw_json"],
)
def get_or_create_show(self, week_start: datetime, week_end: datetime) -> Show:
conn = self._connect()
start_iso = week_start.isoformat()
end_iso = week_end.isoformat()
row = conn.execute(
"SELECT * FROM shows WHERE week_start = ? AND week_end = ?",
(start_iso, end_iso),
).fetchone()
if row is not None:
conn.close()
return Show(
id=row["id"],
week_start=datetime.fromisoformat(row["week_start"]),
week_end=datetime.fromisoformat(row["week_end"]),
created_at=datetime.fromisoformat(row["created_at"]),
)
now_iso = datetime.now(timezone.utc).isoformat()
cursor = conn.execute(
"INSERT INTO shows (week_start, week_end, created_at) VALUES (?, ?, ?)",
(start_iso, end_iso, now_iso),
)
conn.commit()
show = Show(
id=cursor.lastrowid,
week_start=week_start,
week_end=week_end,
created_at=datetime.fromisoformat(now_iso),
)
conn.close()
return show
def get_show_tracks(self, show_id: int) -> list[dict]:
conn = self._connect()
rows = conn.execute(
"""SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url,
t.artwork_url, t.duration_ms, t.liked_at
FROM show_tracks st
JOIN tracks t ON st.track_id = t.id
WHERE st.show_id = ?
ORDER BY st.position""",
(show_id,),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def get_show_track_by_position(self, show_id: int, position: int) -> dict | None:
conn = self._connect()
row = conn.execute(
"""SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url,
t.artwork_url, t.duration_ms, t.liked_at
FROM show_tracks st
JOIN tracks t ON st.track_id = t.id
WHERE st.show_id = ? AND st.position = ?""",
(show_id, position),
).fetchone()
conn.close()
return dict(row) if row else None
def set_show_tracks(self, show_id: int, track_ids: list[int]) -> None:
"""Sync show tracks to match track_ids list.
- Preserves positions of already-assigned tracks.
- Appends new tracks after the current max position.
- Removes tracks no longer in track_ids and re-compacts positions.
"""
conn = self._connect()
existing = conn.execute(
"SELECT track_id, position FROM show_tracks WHERE show_id = ? ORDER BY position",
(show_id,),
).fetchall()
existing_map = {row["track_id"]: row["position"] for row in existing}
track_id_set = set(track_ids)
# Remove tracks that are no longer liked
removed = [tid for tid in existing_map if tid not in track_id_set]
for tid in removed:
conn.execute(
"DELETE FROM show_tracks WHERE show_id = ? AND track_id = ?",
(show_id, tid),
)
# Add new tracks at the end
max_pos = max(existing_map.values()) if existing_map else 0
for track_id in track_ids:
if track_id not in existing_map:
max_pos += 1
conn.execute(
"INSERT OR IGNORE INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)",
(show_id, track_id, max_pos),
)
# Re-compact positions if anything was removed
if removed:
rows = conn.execute(
"SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position",
(show_id,),
).fetchall()
for i, row in enumerate(rows, start=1):
conn.execute(
"UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
(i, show_id, row["track_id"]),
)
conn.commit()
conn.close()
def get_max_position(self, show_id: int) -> int:
conn = self._connect()
row = conn.execute(
"SELECT COALESCE(MAX(position), 0) as max_pos FROM show_tracks WHERE show_id = ?",
(show_id,),
).fetchone()
conn.close()
return row["max_pos"]
def list_shows(self, limit: int = 20, offset: int = 0) -> list[Show]:
conn = self._connect()
rows = conn.execute(
"SELECT * FROM shows ORDER BY week_start DESC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
conn.close()
return [
Show(
id=row["id"],
week_start=datetime.fromisoformat(row["week_start"]),
week_end=datetime.fromisoformat(row["week_end"]),
created_at=datetime.fromisoformat(row["created_at"]),
)
for row in rows
]
def has_track_in_show(self, show_id: int, track_id: int) -> bool:
conn = self._connect()
row = conn.execute(
"SELECT 1 FROM show_tracks WHERE show_id = ? AND track_id = ?",
(show_id, track_id),
).fetchone()
conn.close()
return row is not None
def remove_show_track(self, show_id: int, track_id: int) -> None:
conn = self._connect()
conn.execute(
"DELETE FROM show_tracks WHERE show_id = ? AND track_id = ?",
(show_id, track_id),
)
# Re-compact positions
rows = conn.execute(
"SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position",
(show_id,),
).fetchall()
for i, row in enumerate(rows, start=1):
conn.execute(
"UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
(i, show_id, row["track_id"]),
)
conn.commit()
conn.close()
def move_show_track(self, show_id: int, track_id: int, new_position: int) -> None:
conn = self._connect()
# Get current ordered list
rows = conn.execute(
"SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position",
(show_id,),
).fetchall()
ordered = [row["track_id"] for row in rows]
# Remove and reinsert at new position
ordered.remove(track_id)
ordered.insert(new_position - 1, track_id)
for i, tid in enumerate(ordered, start=1):
conn.execute(
"UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
(i, show_id, tid),
)
conn.commit()
conn.close()
def add_track_to_show(self, show_id: int, track_id: int, position: int | None = None) -> None:
conn = self._connect()
if position is None:
max_pos = self.get_max_position(show_id)
conn.execute(
"INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)",
(show_id, track_id, max_pos + 1),
)
else:
# Shift existing tracks at >= position up by 1
conn.execute(
"UPDATE show_tracks SET position = position + 1 WHERE show_id = ? AND position >= ?",
(show_id, position),
)
conn.execute(
"INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)",
(show_id, track_id, position),
)
conn.commit()
conn.close()
```
Note: `from datetime import datetime, timezone` and `from ntr_fetcher.models import Track, Show, ShowTrack` are already imported at the top of `db.py`.
**Step 4: Run tests to verify they pass**
Run: `pytest tests/test_db.py -v`
Expected: All tests pass.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/db.py tests/test_db.py
git commit -m "feat: add database query methods for tracks, shows, and show_tracks"
```
---
## Task 7: SoundCloud Client — `client_id` Extraction
**Files:**
- Create: `src/ntr_fetcher/soundcloud.py`
- Create: `tests/test_soundcloud.py`
**Step 1: Write the failing test**
```python
# tests/test_soundcloud.py
import pytest
import httpx
from ntr_fetcher.soundcloud import SoundCloudClient
FAKE_HTML = """
<html><head><script>
window.__sc_hydration = [
{"hydratable": "user", "data": {}},
{"hydratable": "apiClient", "data": {"id": "test_client_id_abc123", "isExpiring": false}}
];
</script></head></html>
"""
FAKE_HTML_EXPIRING = """
<html><head><script>
window.__sc_hydration = [
{"hydratable": "apiClient", "data": {"id": "expiring_id_xyz", "isExpiring": true}}
];
</script></head></html>
"""
@pytest.mark.asyncio
async def test_extract_client_id(httpx_mock):
httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
client = SoundCloudClient()
client_id = await client._extract_client_id()
assert client_id == "test_client_id_abc123"
@pytest.mark.asyncio
async def test_extract_client_id_caches(httpx_mock):
httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
client = SoundCloudClient()
id1 = await client._extract_client_id()
id2 = await client._extract_client_id()
assert id1 == id2
assert len(httpx_mock.get_requests()) == 1 # Only fetched once
@pytest.mark.asyncio
async def test_extract_client_id_bad_html(httpx_mock):
httpx_mock.add_response(url="https://soundcloud.com", text="<html>no hydration here</html>")
client = SoundCloudClient()
with pytest.raises(ValueError, match="client_id"):
await client._extract_client_id()
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_soundcloud.py -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Write the implementation**
```python
# src/ntr_fetcher/soundcloud.py
import json
import logging
import re
import httpx
logger = logging.getLogger(__name__)
SOUNDCLOUD_BASE = "https://soundcloud.com"
API_BASE = "https://api-v2.soundcloud.com"
HYDRATION_PATTERN = re.compile(r"__sc_hydration\s*=\s*(\[.*?\])\s*;", re.DOTALL)
class SoundCloudClient:
def __init__(self, http_client: httpx.AsyncClient | None = None):
self._http = http_client or httpx.AsyncClient(timeout=15.0)
self._client_id: str | None = None
async def _extract_client_id(self) -> str:
if self._client_id is not None:
return self._client_id
resp = await self._http.get(SOUNDCLOUD_BASE)
resp.raise_for_status()
match = HYDRATION_PATTERN.search(resp.text)
if not match:
raise ValueError("Could not find __sc_hydration in SoundCloud HTML — cannot extract client_id")
hydration = json.loads(match.group(1))
for entry in hydration:
if entry.get("hydratable") == "apiClient":
self._client_id = entry["data"]["id"]
is_expiring = entry["data"].get("isExpiring", False)
if is_expiring:
logger.warning("SoundCloud client_id is marked as expiring")
return self._client_id
raise ValueError("No apiClient entry in __sc_hydration — cannot extract client_id")
def invalidate_client_id(self) -> None:
self._client_id = None
async def close(self) -> None:
await self._http.aclose()
```
**Step 4: Run test to verify it passes**
Run: `pytest tests/test_soundcloud.py -v`
Expected: 3 passed.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py
git commit -m "feat: add SoundCloud client with client_id extraction"
```
---
## Task 8: SoundCloud Client — Resolve User & Fetch Likes
**Files:**
- Modify: `src/ntr_fetcher/soundcloud.py`
- Modify: `tests/test_soundcloud.py`
**Step 1: Write the failing tests**
Append to `tests/test_soundcloud.py`:
```python
import json as json_mod
from datetime import datetime, timezone
from ntr_fetcher.models import Track
FAKE_RESOLVE_RESPONSE = {"id": 206979918, "kind": "user", "username": "NICKtheRAT"}
FAKE_LIKES_RESPONSE = {
"collection": [
{
"created_at": "2026-03-09T02:25:43Z",
"kind": "like",
"track": {
"id": 12345,
"title": "Test Track",
"permalink_url": "https://soundcloud.com/artist/test-track",
"duration": 180000,
"full_duration": 180000,
"genre": "Electronic",
"tag_list": "",
"created_at": "2026-03-01T00:00:00Z",
"description": "",
"artwork_url": "https://i1.sndcdn.com/artworks-abc-large.jpg",
"license": "cc-by",
"user": {
"id": 999,
"username": "TestArtist",
"permalink_url": "https://soundcloud.com/testartist",
},
"media": {"transcodings": []},
},
}
],
"next_href": None,
}
@pytest.mark.asyncio
async def test_resolve_user(httpx_mock):
httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
httpx_mock.add_response(
url=re.compile(r"https://api-v2\.soundcloud\.com/resolve.*"),
json=FAKE_RESOLVE_RESPONSE,
)
client = SoundCloudClient()
user_id = await client.resolve_user("nicktherat")
assert user_id == 206979918
@pytest.mark.asyncio
async def test_fetch_likes(httpx_mock):
httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
httpx_mock.add_response(
url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
json=FAKE_LIKES_RESPONSE,
)
client = SoundCloudClient()
tracks = await client.fetch_likes(
user_id=206979918,
since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc),
until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc),
)
assert len(tracks) == 1
assert tracks[0].title == "Test Track"
assert tracks[0].artist == "TestArtist"
assert tracks[0].id == 12345
@pytest.mark.asyncio
async def test_fetch_likes_filters_outside_range(httpx_mock):
httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
httpx_mock.add_response(
url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
json=FAKE_LIKES_RESPONSE,
)
client = SoundCloudClient()
# Range that excludes the track (liked at 2026-03-09)
tracks = await client.fetch_likes(
user_id=206979918,
since=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc),
until=datetime(2026, 3, 12, 0, 0, 0, tzinfo=timezone.utc),
)
assert len(tracks) == 0
@pytest.mark.asyncio
async def test_fetch_likes_retries_on_401(httpx_mock):
"""On 401, client_id is refreshed and the request is retried."""
httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
# First likes call returns 401
httpx_mock.add_response(
url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
status_code=401,
)
# Re-extraction gets new client_id
httpx_mock.add_response(
url="https://soundcloud.com",
text=FAKE_HTML.replace("test_client_id_abc123", "new_client_id_456"),
)
# Retry succeeds
httpx_mock.add_response(
url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
json=FAKE_LIKES_RESPONSE,
)
client = SoundCloudClient()
tracks = await client.fetch_likes(
user_id=206979918,
since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc),
until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc),
)
assert len(tracks) == 1
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_soundcloud.py -v`
Expected: FAIL — `AttributeError: 'SoundCloudClient' object has no attribute 'resolve_user'`
**Step 3: Write the implementation**
Add to `SoundCloudClient` in `src/ntr_fetcher/soundcloud.py`:
```python
async def _api_get(self, url: str, params: dict | None = None) -> httpx.Response:
"""Make an API GET request with automatic client_id injection and 401 retry."""
client_id = await self._extract_client_id()
params = dict(params or {})
params["client_id"] = client_id
for attempt in range(3):
resp = await self._http.get(url, params=params)
if resp.status_code == 401:
logger.warning("Got 401 from SoundCloud API, refreshing client_id (attempt %d)", attempt + 1)
self.invalidate_client_id()
client_id = await self._extract_client_id()
params["client_id"] = client_id
continue
resp.raise_for_status()
return resp
raise httpx.HTTPStatusError(
"Failed after 3 attempts (401)",
request=resp.request,
response=resp,
)
async def resolve_user(self, username: str) -> int:
resp = await self._api_get(
f"{API_BASE}/resolve",
params={"url": f"{SOUNDCLOUD_BASE}/{username}"},
)
return resp.json()["id"]
async def fetch_likes(
self,
user_id: int,
since: datetime,
until: datetime,
limit: int = 50,
) -> list["Track"]:
from ntr_fetcher.models import Track
cursor = _build_cursor(until, user_id)
collected: list[Track] = []
while True:
params = {"limit": limit}
if cursor:
params["offset"] = cursor
resp = await self._api_get(f"{API_BASE}/users/{user_id}/likes", params=params)
data = resp.json()
collection = data.get("collection", [])
if not collection:
break
stop = False
for item in collection:
liked_at_str = item.get("created_at", "")
liked_at = datetime.fromisoformat(liked_at_str.replace("Z", "+00:00"))
if liked_at < since:
stop = True
break
if liked_at > until:
continue
track_data = item.get("track")
if track_data is None:
continue
user_data = track_data.get("user", {})
collected.append(
Track(
id=track_data["id"],
title=track_data["title"],
artist=user_data.get("username", "Unknown"),
permalink_url=track_data["permalink_url"],
artwork_url=track_data.get("artwork_url"),
duration_ms=track_data.get("full_duration", track_data.get("duration", 0)),
license=track_data.get("license", ""),
liked_at=liked_at,
raw_json=json.dumps(track_data),
)
)
if stop:
break
next_href = data.get("next_href")
if not next_href:
break
# Extract cursor from next_href
from urllib.parse import urlparse, parse_qs
parsed = urlparse(next_href)
qs = parse_qs(parsed.query)
cursor = qs.get("offset", [None])[0]
if cursor is None:
break
# Return in chronological order (oldest first)
collected.sort(key=lambda t: t.liked_at)
return collected
```
Also add this module-level helper and the missing import at the top of the file:
```python
from datetime import datetime
from urllib.parse import quote
def _build_cursor(until: datetime, user_id: int) -> str:
ts = until.strftime("%Y-%m-%dT%H:%M:%S.000Z")
padded_user = str(user_id).zfill(22)
raw = f"{ts},user-track-likes,000-{padded_user}-99999999999999999999"
return raw
```
**Step 4: Run test to verify it passes**
Run: `pytest tests/test_soundcloud.py -v`
Expected: All tests pass.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py
git commit -m "feat: add user resolution and likes fetching with 401 retry"
```
---
## Task 9: Poller & Supervisor
**Files:**
- Create: `src/ntr_fetcher/poller.py`
- Create: `tests/test_poller.py`
**Step 1: Write the failing test**
```python
# tests/test_poller.py
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from ntr_fetcher.poller import Poller
from ntr_fetcher.models import Track
def _make_track(id: int, liked_at: str) -> Track:
return Track(
id=id,
title=f"Track {id}",
artist="Artist",
permalink_url=f"https://soundcloud.com/a/t-{id}",
artwork_url=None,
duration_ms=180000,
license="cc-by",
liked_at=datetime.fromisoformat(liked_at),
raw_json="{}",
)
@pytest.mark.asyncio
async def test_poll_once_fetches_and_stores():
mock_sc = AsyncMock()
mock_sc.resolve_user.return_value = 206979918
mock_sc.fetch_likes.return_value = [
_make_track(1, "2026-03-14T01:00:00+00:00"),
_make_track(2, "2026-03-14T02:00:00+00:00"),
]
mock_db = MagicMock()
mock_show = MagicMock()
mock_show.id = 1
mock_db.get_or_create_show.return_value = mock_show
poller = Poller(
db=mock_db,
soundcloud=mock_sc,
soundcloud_user="nicktherat",
show_day=2,
show_hour=22,
poll_interval=3600,
)
await poller.poll_once()
assert mock_sc.resolve_user.called
assert mock_sc.fetch_likes.called
assert mock_db.upsert_track.call_count == 2
assert mock_db.set_show_tracks.called
call_args = mock_db.set_show_tracks.call_args
assert call_args[0][0] == 1 # show_id
assert call_args[0][1] == [1, 2] # track_ids in order
@pytest.mark.asyncio
async def test_poll_once_removes_unliked_tracks():
"""When a track disappears from likes, it should be removed from the show."""
mock_sc = AsyncMock()
mock_sc.resolve_user.return_value = 206979918
# First poll: two tracks
mock_sc.fetch_likes.return_value = [
_make_track(1, "2026-03-14T01:00:00+00:00"),
_make_track(2, "2026-03-14T02:00:00+00:00"),
]
mock_db = MagicMock()
mock_show = MagicMock()
mock_show.id = 1
mock_db.get_or_create_show.return_value = mock_show
poller = Poller(
db=mock_db,
soundcloud=mock_sc,
soundcloud_user="nicktherat",
show_day=2,
show_hour=22,
poll_interval=3600,
)
await poller.poll_once()
# Second poll: Nick unliked track 2
mock_sc.fetch_likes.return_value = [
_make_track(1, "2026-03-14T01:00:00+00:00"),
]
mock_db.reset_mock()
mock_db.get_or_create_show.return_value = mock_show
await poller.poll_once()
call_args = mock_db.set_show_tracks.call_args
assert call_args[0][1] == [1] # only track 1 remains
@pytest.mark.asyncio
async def test_poll_once_full_refresh():
mock_sc = AsyncMock()
mock_sc.resolve_user.return_value = 206979918
mock_sc.fetch_likes.return_value = [
_make_track(1, "2026-03-14T01:00:00+00:00"),
]
mock_db = MagicMock()
mock_show = MagicMock()
mock_show.id = 1
mock_db.get_or_create_show.return_value = mock_show
poller = Poller(
db=mock_db,
soundcloud=mock_sc,
soundcloud_user="nicktherat",
show_day=2,
show_hour=22,
poll_interval=3600,
)
await poller.poll_once(full=True)
# full=True should still call fetch_likes
assert mock_sc.fetch_likes.called
@pytest.mark.asyncio
async def test_supervisor_restarts_poller_on_failure():
call_count = 0
async def failing_poll():
nonlocal call_count
call_count += 1
if call_count <= 2:
raise RuntimeError("Simulated failure")
mock_sc = AsyncMock()
mock_db = MagicMock()
poller = Poller(
db=mock_db,
soundcloud=mock_sc,
soundcloud_user="nicktherat",
show_day=2,
show_hour=22,
poll_interval=0.01, # Very short for testing
)
poller.poll_once = failing_poll
task = asyncio.create_task(poller.run_supervised(restart_delay=0.01))
await asyncio.sleep(0.2)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert call_count >= 3 # Failed twice, succeeded at least once
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_poller.py -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Write the implementation**
```python
# src/ntr_fetcher/poller.py
import asyncio
import logging
from datetime import datetime, timezone
from ntr_fetcher.db import Database
from ntr_fetcher.soundcloud import SoundCloudClient
from ntr_fetcher.week import get_show_week
logger = logging.getLogger(__name__)
class Poller:
def __init__(
self,
db: Database,
soundcloud: SoundCloudClient,
soundcloud_user: str,
show_day: int,
show_hour: int,
poll_interval: float,
):
self._db = db
self._sc = soundcloud
self._user = soundcloud_user
self._show_day = show_day
self._show_hour = show_hour
self._poll_interval = poll_interval
self._user_id: int | None = None
self.last_fetch: datetime | None = None
self.alive = True
async def _get_user_id(self) -> int:
if self._user_id is None:
self._user_id = await self._sc.resolve_user(self._user)
return self._user_id
async def poll_once(self, full: bool = False) -> None:
user_id = await self._get_user_id()
now = datetime.now(timezone.utc)
week_start, week_end = get_show_week(now, self._show_day, self._show_hour)
show = self._db.get_or_create_show(week_start, week_end)
tracks = await self._sc.fetch_likes(
user_id=user_id,
since=week_start,
until=week_end,
)
for track in tracks:
self._db.upsert_track(track)
track_ids = [t.id for t in tracks]
self._db.set_show_tracks(show.id, track_ids)
self.last_fetch = datetime.now(timezone.utc)
logger.info("Fetched %d tracks for show %d", len(tracks), show.id)
async def run_supervised(self, restart_delay: float = 30.0) -> None:
while True:
try:
self.alive = True
await self.poll_once()
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError:
self.alive = False
raise
except Exception:
self.alive = False
logger.exception("Poller failed, restarting in %.1fs", restart_delay)
await asyncio.sleep(restart_delay)
```
**Step 4: Run test to verify it passes**
Run: `pytest tests/test_poller.py -v`
Expected: 3 passed.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/poller.py tests/test_poller.py
git commit -m "feat: add poller with supervised restart loop"
```
---
## Task 10: API Routes
**Files:**
- Create: `src/ntr_fetcher/api.py`
- Create: `tests/test_api.py`
**Step 1: Write the failing tests**
```python
# tests/test_api.py
from datetime import datetime, timezone
import pytest
from fastapi.testclient import TestClient
from ntr_fetcher.api import create_app
from ntr_fetcher.db import Database
from ntr_fetcher.models import Track
@pytest.fixture
def db(tmp_path):
database = Database(str(tmp_path / "test.db"))
database.initialize()
return database
@pytest.fixture
def app(db):
from unittest.mock import AsyncMock, MagicMock
poller = MagicMock()
poller.last_fetch = datetime(2026, 3, 12, 12, 0, 0, tzinfo=timezone.utc)
poller.alive = True
poller.poll_once = AsyncMock()
return create_app(db=db, poller=poller, admin_token="test-token")
@pytest.fixture
def client(app):
return TestClient(app)
def _seed_show(db):
week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
show = db.get_or_create_show(week_start, week_end)
t1 = Track(1, "Song A", "Artist A", "https://soundcloud.com/a/1", None, 180000, "cc-by",
datetime(2026, 3, 14, 1, 0, 0, tzinfo=timezone.utc), "{}")
t2 = Track(2, "Song B", "Artist B", "https://soundcloud.com/b/2", None, 200000, "cc-by-sa",
datetime(2026, 3, 14, 2, 0, 0, tzinfo=timezone.utc), "{}")
db.upsert_track(t1)
db.upsert_track(t2)
db.set_show_tracks(show.id, [t1.id, t2.id])
return show
def test_health(client):
resp = client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["poller_alive"] is True
def test_playlist(client, db):
_seed_show(db)
resp = client.get("/playlist")
assert resp.status_code == 200
data = resp.json()
assert len(data["tracks"]) == 2
assert data["tracks"][0]["position"] == 1
assert data["tracks"][0]["title"] == "Song A"
def test_playlist_by_position(client, db):
_seed_show(db)
resp = client.get("/playlist/2")
assert resp.status_code == 200
assert resp.json()["title"] == "Song B"
def test_playlist_by_position_not_found(client, db):
_seed_show(db)
resp = client.get("/playlist/99")
assert resp.status_code == 404
def test_shows_list(client, db):
_seed_show(db)
resp = client.get("/shows")
assert resp.status_code == 200
assert len(resp.json()) >= 1
def test_shows_detail(client, db):
show = _seed_show(db)
resp = client.get(f"/shows/{show.id}")
assert resp.status_code == 200
assert len(resp.json()["tracks"]) == 2
def test_admin_refresh_requires_token(client):
resp = client.post("/admin/refresh")
assert resp.status_code == 401
def test_admin_refresh_with_token(client):
resp = client.post(
"/admin/refresh",
headers={"Authorization": "Bearer test-token"},
json={"full": False},
)
assert resp.status_code == 200
def test_admin_remove_track(client, db):
show = _seed_show(db)
resp = client.delete(
"/admin/tracks/1",
headers={"Authorization": "Bearer test-token"},
)
assert resp.status_code == 200
tracks = db.get_show_tracks(show.id)
assert len(tracks) == 1
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_api.py -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Write the implementation**
```python
# src/ntr_fetcher/api.py
import logging
from datetime import datetime, timezone
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from ntr_fetcher.db import Database
from ntr_fetcher.week import get_show_week
logger = logging.getLogger(__name__)
class RefreshRequest(BaseModel):
full: bool = False
class AddTrackRequest(BaseModel):
soundcloud_url: str | None = None
track_id: int | None = None
position: int | None = None
class MoveTrackRequest(BaseModel):
position: int
def create_app(db: Database, poller, admin_token: str) -> FastAPI:
app = FastAPI(title="NtR SoundCloud Fetcher")
def _require_admin(authorization: str = Header(None)):
if authorization is None or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid token")
token = authorization.removeprefix("Bearer ")
if token != admin_token:
raise HTTPException(status_code=401, detail="Invalid token")
def _current_show():
now = datetime.now(timezone.utc)
week_start, week_end = get_show_week(now, show_day=2, show_hour=22)
return db.get_or_create_show(week_start, week_end)
@app.get("/health")
def health():
show = _current_show()
tracks = db.get_show_tracks(show.id)
return {
"status": "ok",
"poller_alive": poller.alive,
"last_fetch": poller.last_fetch.isoformat() if poller.last_fetch else None,
"current_week_track_count": len(tracks),
}
@app.get("/playlist")
def playlist():
show = _current_show()
tracks = db.get_show_tracks(show.id)
return {
"show_id": show.id,
"week_start": show.week_start.isoformat(),
"week_end": show.week_end.isoformat(),
"tracks": tracks,
}
@app.get("/playlist/{position}")
def playlist_track(position: int):
show = _current_show()
track = db.get_show_track_by_position(show.id, position)
if track is None:
raise HTTPException(status_code=404, detail=f"No track at position {position}")
return track
@app.get("/shows")
def list_shows(limit: int = 20, offset: int = 0):
shows = db.list_shows(limit=limit, offset=offset)
return [
{
"id": s.id,
"week_start": s.week_start.isoformat(),
"week_end": s.week_end.isoformat(),
"created_at": s.created_at.isoformat(),
}
for s in shows
]
@app.get("/shows/{show_id}")
def show_detail(show_id: int):
shows = db.list_shows(limit=1000, offset=0)
show = next((s for s in shows if s.id == show_id), None)
if show is None:
raise HTTPException(status_code=404, detail="Show not found")
tracks = db.get_show_tracks(show.id)
return {
"show_id": show.id,
"week_start": show.week_start.isoformat(),
"week_end": show.week_end.isoformat(),
"tracks": tracks,
}
@app.post("/admin/refresh")
async def admin_refresh(body: RefreshRequest = RefreshRequest(), _=Depends(_require_admin)):
await poller.poll_once(full=body.full)
show = _current_show()
tracks = db.get_show_tracks(show.id)
return {"status": "refreshed", "track_count": len(tracks)}
@app.post("/admin/tracks")
def admin_add_track(body: AddTrackRequest, _=Depends(_require_admin)):
show = _current_show()
if body.track_id is not None:
db.add_track_to_show(show.id, body.track_id, body.position)
return {"status": "added"}
raise HTTPException(status_code=400, detail="Provide track_id")
@app.delete("/admin/tracks/{track_id}")
def admin_remove_track(track_id: int, _=Depends(_require_admin)):
show = _current_show()
if not db.has_track_in_show(show.id, track_id):
raise HTTPException(status_code=404, detail="Track not in current show")
db.remove_show_track(show.id, track_id)
return {"status": "removed"}
@app.put("/admin/tracks/{track_id}/position")
def admin_move_track(track_id: int, body: MoveTrackRequest, _=Depends(_require_admin)):
show = _current_show()
if not db.has_track_in_show(show.id, track_id):
raise HTTPException(status_code=404, detail="Track not in current show")
db.move_show_track(show.id, track_id, body.position)
return {"status": "moved"}
return app
```
**Step 4: Run test to verify it passes**
Run: `pytest tests/test_api.py -v`
Expected: All tests pass.
**Step 5: Commit**
```bash
git add src/ntr_fetcher/api.py tests/test_api.py
git commit -m "feat: add FastAPI routes for playlist, shows, admin, and health"
```
---
## Task 11: Main Entry Point
**Files:**
- Create: `src/ntr_fetcher/main.py`
**Step 1: Write the entry point**
```python
# src/ntr_fetcher/main.py
import asyncio
import logging
import uvicorn
from ntr_fetcher.api import create_app
from ntr_fetcher.config import Settings
from ntr_fetcher.db import Database
from ntr_fetcher.poller import Poller
from ntr_fetcher.soundcloud import SoundCloudClient
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
def run() -> None:
settings = Settings()
db = Database(settings.db_path)
db.initialize()
logger.info("Database initialized at %s", settings.db_path)
sc = SoundCloudClient()
poller = Poller(
db=db,
soundcloud=sc,
soundcloud_user=settings.soundcloud_user,
show_day=settings.show_day,
show_hour=settings.show_hour,
poll_interval=settings.poll_interval_seconds,
)
app = create_app(db=db, poller=poller, admin_token=settings.admin_token)
@app.on_event("startup")
async def start_poller():
logger.info("Starting poller (interval=%ds)", settings.poll_interval_seconds)
asyncio.create_task(poller.run_supervised())
@app.on_event("shutdown")
async def shutdown():
logger.info("Shutting down")
await sc.close()
uvicorn.run(app, host=settings.host, port=settings.port)
if __name__ == "__main__":
run()
```
**Step 2: Verify it at least imports**
Run: `python -c "from ntr_fetcher.main import run; print('OK')"`
Expected: `OK`
**Step 3: Commit**
```bash
git add src/ntr_fetcher/main.py
git commit -m "feat: add main entry point wiring API server and poller"
```
---
## Task 12: README
**Files:**
- Create: `README.md`
**Step 1: Write the README**
```markdown
# NtR SoundCloud Fetcher
Fetches SoundCloud likes from NicktheRat's profile, builds weekly playlists
aligned to the Wednesday 22:00 ET show schedule, and serves them via a JSON
API.
## Quick Start
```bash
pip install -e ".[dev]"
export NTR_ADMIN_TOKEN="your-secret-here"
ntr-fetcher
```
The API starts at `http://127.0.0.1:8000`.
## API
| Endpoint | Description |
|----------|-------------|
| `GET /playlist` | Current week's playlist |
| `GET /playlist/{n}` | Track at position n |
| `GET /shows` | List all shows |
| `GET /shows/{id}` | Specific show's playlist |
| `GET /health` | Service health check |
| `POST /admin/refresh` | Trigger SoundCloud fetch (token required) |
## Configuration
Environment variables (prefix `NTR_`):
| Variable | Default | Description |
|----------|---------|-------------|
| `NTR_PORT` | `8000` | API port |
| `NTR_HOST` | `127.0.0.1` | Bind address |
| `NTR_DB_PATH` | `./ntr_fetcher.db` | SQLite path |
| `NTR_POLL_INTERVAL_SECONDS` | `3600` | Poll frequency |
| `NTR_ADMIN_TOKEN` | (required) | Admin bearer token |
| `NTR_SOUNDCLOUD_USER` | `nicktherat` | SoundCloud user |
## Development
```bash
pip install -e ".[dev]"
pytest
```
```
**Step 2: Commit**
```bash
git add README.md
git commit -m "docs: add README with quick start and API reference"
```
---
## Task 13: Run Full Test Suite
**Step 1: Run all tests**
Run: `pytest -v`
Expected: All tests pass.
**Step 2: Run ruff linter**
Run: `ruff check src/ tests/`
Expected: No errors (or fix any that appear).
**Step 3: Final commit if any fixes were needed**
```bash
git add -A
git commit -m "fix: address linting issues"
```