Files
NtR-soudcloud-fetcher/docs/plans/2026-03-12-ntr-fetcher-implementation.md
cottongin 22f6b5cbca design: sync unliked tracks — remove from show and re-compact positions
Nick is the host; if he unlikes a track, the service should respect
that and remove it from the show playlist. Positions re-compact after
removal. The tracks table retains the record for historical reference.

Made-with: Cursor
2026-03-12 01:13:19 -04:00

61 KiB

NtR SoundCloud Fetcher Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Build a Python service that polls NicktheRat's SoundCloud likes, builds weekly playlists, and serves them via a JSON API.

Architecture: Single async Python process — FastAPI serves the JSON API, an async background task polls SoundCloud hourly, and a supervisor loop restarts the poller on failure. SQLite stores all state. See docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md for the full design.

Tech Stack: Python 3.11+, FastAPI, uvicorn, httpx, pydantic, sqlite3, zoneinfo.

Reference docs:

  • Design: docs/plans/2026-03-12-ntr-soundcloud-fetcher-design.md
  • SoundCloud API: docs/soundcloud-likes-api.md

Task 1: Project Scaffolding

Files:

  • Create: pyproject.toml
  • Create: src/ntr_fetcher/__init__.py
  • Create: tests/__init__.py
  • Create: tests/conftest.py

Step 1: Create pyproject.toml

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "ntr-fetcher"
version = "0.1.0"
description = "SoundCloud likes fetcher for Nick the Rat Radio"
requires-python = ">=3.11"
dependencies = [
    "fastapi",
    "uvicorn[standard]",
    "httpx",
    "pydantic-settings",
]

[project.optional-dependencies]
dev = [
    "pytest",
    "pytest-asyncio",
    "pytest-httpx",
    "ruff",
]

[project.scripts]
ntr-fetcher = "ntr_fetcher.main:run"

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

[tool.ruff]
target-version = "py311"
src = ["src"]

Step 2: Create package init

# src/ntr_fetcher/__init__.py

(Empty file — just marks the package.)

Step 3: Create test scaffolding

# tests/__init__.py
# tests/conftest.py
import pytest

Step 4: Install the project in dev mode

Run: pip install -e ".[dev]" Expected: Successfully installed with all dependencies.

Step 5: Verify pytest runs

Run: pytest --co Expected: "no tests ran" (collected 0 items), exit code 0 or 5 (no tests found).

Step 6: Commit

git add pyproject.toml src/ tests/
git commit -m "scaffold: project structure with pyproject.toml and test config"

Task 2: Configuration Module

Files:

  • Create: src/ntr_fetcher/config.py
  • Create: tests/test_config.py

Step 1: Write the failing test

# tests/test_config.py
import os

from ntr_fetcher.config import Settings


def test_settings_defaults():
    settings = Settings(admin_token="test-secret")
    assert settings.port == 8000
    assert settings.host == "127.0.0.1"
    assert settings.db_path == "./ntr_fetcher.db"
    assert settings.poll_interval_seconds == 3600
    assert settings.soundcloud_user == "nicktherat"
    assert settings.show_day == 2
    assert settings.show_hour == 22


def test_settings_from_env(monkeypatch):
    monkeypatch.setenv("NTR_PORT", "9090")
    monkeypatch.setenv("NTR_HOST", "0.0.0.0")
    monkeypatch.setenv("NTR_ADMIN_TOKEN", "my-secret")
    monkeypatch.setenv("NTR_SOUNDCLOUD_USER", "someoneelse")
    settings = Settings()
    assert settings.port == 9090
    assert settings.host == "0.0.0.0"
    assert settings.admin_token == "my-secret"
    assert settings.soundcloud_user == "someoneelse"


def test_settings_admin_token_required():
    import pytest
    with pytest.raises(Exception):
        Settings()

Step 2: Run test to verify it fails

Run: pytest tests/test_config.py -v Expected: FAIL — ModuleNotFoundError: No module named 'ntr_fetcher.config'

Step 3: Write the implementation

# src/ntr_fetcher/config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    model_config = {"env_prefix": "NTR_"}

    port: int = 8000
    host: str = "127.0.0.1"
    db_path: str = "./ntr_fetcher.db"
    poll_interval_seconds: int = 3600
    admin_token: str
    soundcloud_user: str = "nicktherat"
    show_day: int = 2
    show_hour: int = 22

Step 4: Run test to verify it passes

Run: pytest tests/test_config.py -v Expected: 3 passed.

Step 5: Commit

git add src/ntr_fetcher/config.py tests/test_config.py
git commit -m "feat: add configuration module with pydantic-settings"

Task 3: Week Boundary Computation

Files:

  • Create: src/ntr_fetcher/week.py
  • Create: tests/test_week.py

This module computes the Wednesday 22:00 Eastern boundaries in UTC. Must handle EST/EDT transitions correctly.

Step 1: Write the failing tests

# tests/test_week.py
from datetime import datetime, timezone

from ntr_fetcher.week import get_show_week, Show_DAY_DEFAULT, SHOW_HOUR_DEFAULT


def test_mid_week_thursday():
    """Thursday should belong to the show that started the previous Wednesday."""
    # Thursday March 13, 2026 15:00 UTC (11:00 AM EDT)
    now = datetime(2026, 3, 13, 15, 0, 0, tzinfo=timezone.utc)
    start, end = get_show_week(now, show_day=2, show_hour=22)
    # Show started Wed March 12 22:00 EDT = March 13 02:00 UTC (DST active since March 8)
    assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    # Show ends next Wed March 19 22:00 EDT = March 20 02:00 UTC
    assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)


def test_wednesday_before_show():
    """Wednesday before 22:00 ET belongs to the previous week's show."""
    # Wednesday March 12, 2026 20:00 UTC (16:00 EDT — before 22:00 EDT)
    now = datetime(2026, 3, 12, 20, 0, 0, tzinfo=timezone.utc)
    start, end = get_show_week(now, show_day=2, show_hour=22)
    # Previous Wed was March 5 22:00 EST = March 6 03:00 UTC (before DST)
    assert start == datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc)
    # Ends this Wed March 12 22:00 EDT = March 13 02:00 UTC (after DST)
    assert end == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)


def test_wednesday_after_show_starts():
    """Wednesday at or after 22:00 ET belongs to the new week."""
    # Wednesday March 12, 2026 at 22:30 EDT = March 13 02:30 UTC
    now = datetime(2026, 3, 13, 2, 30, 0, tzinfo=timezone.utc)
    start, end = get_show_week(now, show_day=2, show_hour=22)
    assert start == datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    assert end == datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)


def test_est_period_no_dst():
    """January — firmly in EST (UTC-5)."""
    # Thursday Jan 15, 2026 12:00 UTC
    now = datetime(2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
    start, end = get_show_week(now, show_day=2, show_hour=22)
    # Wed Jan 14 22:00 EST = Jan 15 03:00 UTC
    assert start == datetime(2026, 1, 15, 3, 0, 0, tzinfo=timezone.utc)
    # Wed Jan 21 22:00 EST = Jan 22 03:00 UTC
    assert end == datetime(2026, 1, 22, 3, 0, 0, tzinfo=timezone.utc)

Step 2: Run test to verify it fails

Run: pytest tests/test_week.py -v Expected: FAIL — ModuleNotFoundError

Step 3: Write the implementation

# src/ntr_fetcher/week.py
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo

EASTERN = ZoneInfo("America/New_York")
Show_DAY_DEFAULT = 2  # Wednesday (Monday=0)
SHOW_HOUR_DEFAULT = 22


def get_show_week(
    now_utc: datetime,
    show_day: int = Show_DAY_DEFAULT,
    show_hour: int = SHOW_HOUR_DEFAULT,
) -> tuple[datetime, datetime]:
    """Return (week_start_utc, week_end_utc) for the show week containing now_utc.

    The week starts at show_day at show_hour Eastern Time and runs for 7 days.
    """
    now_et = now_utc.astimezone(EASTERN)

    # Find the most recent show_day at show_hour that is <= now
    days_since_show_day = (now_et.weekday() - show_day) % 7
    candidate_date = now_et.date() - timedelta(days=days_since_show_day)
    candidate = datetime(
        candidate_date.year,
        candidate_date.month,
        candidate_date.day,
        show_hour,
        0,
        0,
        tzinfo=EASTERN,
    )

    if candidate > now_et:
        # We're on the show day but before the show hour — go back a week
        candidate -= timedelta(days=7)

    week_start_utc = candidate.astimezone(timezone.utc).replace(tzinfo=timezone.utc)
    week_end_utc = (candidate + timedelta(days=7)).astimezone(timezone.utc).replace(tzinfo=timezone.utc)

    return week_start_utc, week_end_utc

Step 4: Run test to verify it passes

Run: pytest tests/test_week.py -v Expected: 4 passed.

Step 5: Commit

git add src/ntr_fetcher/week.py tests/test_week.py
git commit -m "feat: add week boundary computation with DST handling"

Task 4: Models

Files:

  • Create: src/ntr_fetcher/models.py

Simple dataclasses for passing data between layers. No test needed — these are pure data containers validated by type checkers and used transitively in other tests.

Step 1: Write the models

# src/ntr_fetcher/models.py
from dataclasses import dataclass
from datetime import datetime


@dataclass(frozen=True)
class Track:
    id: int
    title: str
    artist: str
    permalink_url: str
    artwork_url: str | None
    duration_ms: int
    license: str
    liked_at: datetime
    raw_json: str


@dataclass(frozen=True)
class Show:
    id: int
    week_start: datetime
    week_end: datetime
    created_at: datetime


@dataclass(frozen=True)
class ShowTrack:
    show_id: int
    track_id: int
    position: int

Step 2: Commit

git add src/ntr_fetcher/models.py
git commit -m "feat: add data models for Track, Show, ShowTrack"

Task 5: Database Module — Schema & Migrations

Files:

  • Create: src/ntr_fetcher/db.py
  • Create: tests/test_db.py

This is a larger module. We'll build it in stages: schema first, then queries.

Step 1: Write the failing test for schema creation

# tests/test_db.py
import sqlite3

import pytest

from ntr_fetcher.db import Database


@pytest.fixture
def db(tmp_path):
    db_path = str(tmp_path / "test.db")
    database = Database(db_path)
    database.initialize()
    return database


def test_tables_created(db):
    conn = sqlite3.connect(db.path)
    cursor = conn.execute(
        "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
    )
    tables = [row[0] for row in cursor.fetchall()]
    conn.close()
    assert "tracks" in tables
    assert "shows" in tables
    assert "show_tracks" in tables


def test_initialize_idempotent(db):
    """Calling initialize twice doesn't raise."""
    db.initialize()

Step 2: Run test to verify it fails

Run: pytest tests/test_db.py::test_tables_created -v Expected: FAIL — ModuleNotFoundError

Step 3: Write the schema initialization

# src/ntr_fetcher/db.py
import sqlite3
from datetime import datetime, timezone

from ntr_fetcher.models import Track, Show, ShowTrack

SCHEMA = """
CREATE TABLE IF NOT EXISTS tracks (
    id INTEGER PRIMARY KEY,
    title TEXT NOT NULL,
    artist TEXT NOT NULL,
    permalink_url TEXT NOT NULL,
    artwork_url TEXT,
    duration_ms INTEGER NOT NULL,
    license TEXT NOT NULL DEFAULT '',
    liked_at TEXT NOT NULL,
    raw_json TEXT NOT NULL DEFAULT '{}'
);

CREATE TABLE IF NOT EXISTS shows (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    week_start TEXT NOT NULL,
    week_end TEXT NOT NULL,
    created_at TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS show_tracks (
    show_id INTEGER NOT NULL REFERENCES shows(id),
    track_id INTEGER NOT NULL REFERENCES tracks(id),
    position INTEGER NOT NULL,
    UNIQUE(show_id, track_id)
);

CREATE INDEX IF NOT EXISTS idx_show_tracks_show_id ON show_tracks(show_id);
CREATE INDEX IF NOT EXISTS idx_tracks_liked_at ON tracks(liked_at);
"""


class Database:
    def __init__(self, path: str):
        self.path = path

    def _connect(self) -> sqlite3.Connection:
        conn = sqlite3.connect(self.path)
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA foreign_keys=ON")
        conn.row_factory = sqlite3.Row
        return conn

    def initialize(self) -> None:
        conn = self._connect()
        conn.executescript(SCHEMA)
        conn.close()

Step 4: Run test to verify it passes

Run: pytest tests/test_db.py -v Expected: 2 passed.

Step 5: Commit

git add src/ntr_fetcher/db.py tests/test_db.py
git commit -m "feat: add database module with schema initialization"

Task 6: Database Module — Queries

Files:

  • Modify: src/ntr_fetcher/db.py
  • Modify: tests/test_db.py

Add methods for all the CRUD operations the service needs.

Step 1: Write the failing tests

Append to tests/test_db.py:

from datetime import datetime, timezone

from ntr_fetcher.models import Track


def _make_track(id: int, liked_at: str, title: str = "Test", artist: str = "Artist") -> Track:
    return Track(
        id=id,
        title=title,
        artist=artist,
        permalink_url=f"https://soundcloud.com/test/track-{id}",
        artwork_url=None,
        duration_ms=180000,
        license="cc-by",
        liked_at=datetime.fromisoformat(liked_at),
        raw_json="{}",
    )


def test_upsert_track(db):
    track = _make_track(100, "2026-03-10T12:00:00+00:00")
    db.upsert_track(track)
    result = db.get_track(100)
    assert result is not None
    assert result.title == "Test"


def test_upsert_track_updates_existing(db):
    track1 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Original")
    db.upsert_track(track1)
    track2 = _make_track(100, "2026-03-10T12:00:00+00:00", title="Updated")
    db.upsert_track(track2)
    result = db.get_track(100)
    assert result.title == "Updated"


def test_get_or_create_show(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)
    assert show.id is not None
    assert show.week_start == week_start

    # Calling again returns the same show
    show2 = db.get_or_create_show(week_start, week_end)
    assert show2.id == show.id


def test_set_show_tracks(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)

    t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First")
    t2 = _make_track(2, "2026-03-14T02:00:00+00:00", title="Second")
    db.upsert_track(t1)
    db.upsert_track(t2)

    db.set_show_tracks(show.id, [t1.id, t2.id])
    tracks = db.get_show_tracks(show.id)
    assert len(tracks) == 2
    assert tracks[0]["position"] == 1
    assert tracks[0]["title"] == "First"
    assert tracks[1]["position"] == 2


def test_set_show_tracks_preserves_existing_positions(db):
    """Adding a new track doesn't change existing track positions."""
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)

    t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
    db.upsert_track(t1)
    db.set_show_tracks(show.id, [t1.id])

    t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
    db.upsert_track(t2)
    db.set_show_tracks(show.id, [t1.id, t2.id])

    tracks = db.get_show_tracks(show.id)
    assert tracks[0]["track_id"] == 1
    assert tracks[0]["position"] == 1
    assert tracks[1]["track_id"] == 2
    assert tracks[1]["position"] == 2


def test_set_show_tracks_removes_unliked(db):
    """Tracks no longer in the likes list are removed and positions re-compact."""
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)

    t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First")
    t2 = _make_track(2, "2026-03-14T02:00:00+00:00", title="Second")
    t3 = _make_track(3, "2026-03-14T03:00:00+00:00", title="Third")
    db.upsert_track(t1)
    db.upsert_track(t2)
    db.upsert_track(t3)
    db.set_show_tracks(show.id, [t1.id, t2.id, t3.id])

    # Nick unlikes track 2
    db.set_show_tracks(show.id, [t1.id, t3.id])
    tracks = db.get_show_tracks(show.id)
    assert len(tracks) == 2
    assert tracks[0]["track_id"] == 1
    assert tracks[0]["position"] == 1
    assert tracks[1]["track_id"] == 3
    assert tracks[1]["position"] == 2  # re-compacted from 3


def test_get_show_track_by_position(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)
    t1 = _make_track(1, "2026-03-14T01:00:00+00:00", title="First")
    db.upsert_track(t1)
    db.set_show_tracks(show.id, [t1.id])

    result = db.get_show_track_by_position(show.id, 1)
    assert result is not None
    assert result["title"] == "First"

    result_missing = db.get_show_track_by_position(show.id, 99)
    assert result_missing is None


def test_list_shows(db):
    s1 = db.get_or_create_show(
        datetime(2026, 3, 6, 3, 0, 0, tzinfo=timezone.utc),
        datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc),
    )
    s2 = db.get_or_create_show(
        datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc),
        datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc),
    )
    shows = db.list_shows(limit=10, offset=0)
    assert len(shows) == 2
    # Newest first
    assert shows[0].id == s2.id


def test_max_position_for_show(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)

    assert db.get_max_position(show.id) == 0

    t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
    db.upsert_track(t1)
    db.set_show_tracks(show.id, [t1.id])
    assert db.get_max_position(show.id) == 1


def test_remove_show_track(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)

    t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
    t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
    t3 = _make_track(3, "2026-03-14T03:00:00+00:00")
    db.upsert_track(t1)
    db.upsert_track(t2)
    db.upsert_track(t3)
    db.set_show_tracks(show.id, [t1.id, t2.id, t3.id])

    db.remove_show_track(show.id, 2)
    tracks = db.get_show_tracks(show.id)
    assert len(tracks) == 2
    assert tracks[0]["position"] == 1
    assert tracks[0]["track_id"] == 1
    # Position re-compacted: track 3 moved from position 3 to 2
    assert tracks[1]["position"] == 2
    assert tracks[1]["track_id"] == 3


def test_move_show_track(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)

    t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
    t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
    t3 = _make_track(3, "2026-03-14T03:00:00+00:00")
    db.upsert_track(t1)
    db.upsert_track(t2)
    db.upsert_track(t3)
    db.set_show_tracks(show.id, [t1.id, t2.id, t3.id])

    # Move track 3 from position 3 to position 1
    db.move_show_track(show.id, track_id=3, new_position=1)
    tracks = db.get_show_tracks(show.id)
    assert tracks[0]["track_id"] == 3
    assert tracks[0]["position"] == 1
    assert tracks[1]["track_id"] == 1
    assert tracks[1]["position"] == 2
    assert tracks[2]["track_id"] == 2
    assert tracks[2]["position"] == 3


def test_add_track_to_show_at_position(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)

    t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
    t2 = _make_track(2, "2026-03-14T02:00:00+00:00")
    t3 = _make_track(3, "2026-03-14T03:00:00+00:00")
    db.upsert_track(t1)
    db.upsert_track(t2)
    db.upsert_track(t3)
    db.set_show_tracks(show.id, [t1.id, t2.id])

    # Insert track 3 at position 2 — pushes track 2 to position 3
    db.add_track_to_show(show.id, track_id=3, position=2)
    tracks = db.get_show_tracks(show.id)
    assert len(tracks) == 3
    assert tracks[0]["track_id"] == 1
    assert tracks[1]["track_id"] == 3
    assert tracks[2]["track_id"] == 2


def test_has_track_in_show(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)
    t1 = _make_track(1, "2026-03-14T01:00:00+00:00")
    db.upsert_track(t1)
    db.set_show_tracks(show.id, [t1.id])

    assert db.has_track_in_show(show.id, 1) is True
    assert db.has_track_in_show(show.id, 999) is False

Step 2: Run test to verify it fails

Run: pytest tests/test_db.py -v Expected: FAIL — AttributeError: 'Database' object has no attribute 'upsert_track'

Step 3: Write the query methods

Add these methods to the Database class in src/ntr_fetcher/db.py:

    def upsert_track(self, track: Track) -> None:
        conn = self._connect()
        conn.execute(
            """INSERT INTO tracks (id, title, artist, permalink_url, artwork_url, duration_ms, license, liked_at, raw_json)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(id) DO UPDATE SET
                title=excluded.title,
                artist=excluded.artist,
                permalink_url=excluded.permalink_url,
                artwork_url=excluded.artwork_url,
                duration_ms=excluded.duration_ms,
                license=excluded.license,
                liked_at=excluded.liked_at,
                raw_json=excluded.raw_json
            """,
            (
                track.id,
                track.title,
                track.artist,
                track.permalink_url,
                track.artwork_url,
                track.duration_ms,
                track.license,
                track.liked_at.isoformat(),
                track.raw_json,
            ),
        )
        conn.commit()
        conn.close()

    def get_track(self, track_id: int) -> Track | None:
        conn = self._connect()
        row = conn.execute("SELECT * FROM tracks WHERE id = ?", (track_id,)).fetchone()
        conn.close()
        if row is None:
            return None
        return Track(
            id=row["id"],
            title=row["title"],
            artist=row["artist"],
            permalink_url=row["permalink_url"],
            artwork_url=row["artwork_url"],
            duration_ms=row["duration_ms"],
            license=row["license"],
            liked_at=datetime.fromisoformat(row["liked_at"]),
            raw_json=row["raw_json"],
        )

    def get_or_create_show(self, week_start: datetime, week_end: datetime) -> Show:
        conn = self._connect()
        start_iso = week_start.isoformat()
        end_iso = week_end.isoformat()
        row = conn.execute(
            "SELECT * FROM shows WHERE week_start = ? AND week_end = ?",
            (start_iso, end_iso),
        ).fetchone()
        if row is not None:
            conn.close()
            return Show(
                id=row["id"],
                week_start=datetime.fromisoformat(row["week_start"]),
                week_end=datetime.fromisoformat(row["week_end"]),
                created_at=datetime.fromisoformat(row["created_at"]),
            )
        now_iso = datetime.now(timezone.utc).isoformat()
        cursor = conn.execute(
            "INSERT INTO shows (week_start, week_end, created_at) VALUES (?, ?, ?)",
            (start_iso, end_iso, now_iso),
        )
        conn.commit()
        show = Show(
            id=cursor.lastrowid,
            week_start=week_start,
            week_end=week_end,
            created_at=datetime.fromisoformat(now_iso),
        )
        conn.close()
        return show

    def get_show_tracks(self, show_id: int) -> list[dict]:
        conn = self._connect()
        rows = conn.execute(
            """SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url,
                      t.artwork_url, t.duration_ms, t.liked_at
            FROM show_tracks st
            JOIN tracks t ON st.track_id = t.id
            WHERE st.show_id = ?
            ORDER BY st.position""",
            (show_id,),
        ).fetchall()
        conn.close()
        return [dict(row) for row in rows]

    def get_show_track_by_position(self, show_id: int, position: int) -> dict | None:
        conn = self._connect()
        row = conn.execute(
            """SELECT st.position, st.track_id, t.title, t.artist, t.permalink_url,
                      t.artwork_url, t.duration_ms, t.liked_at
            FROM show_tracks st
            JOIN tracks t ON st.track_id = t.id
            WHERE st.show_id = ? AND st.position = ?""",
            (show_id, position),
        ).fetchone()
        conn.close()
        return dict(row) if row else None

    def set_show_tracks(self, show_id: int, track_ids: list[int]) -> None:
        """Sync show tracks to match track_ids list.

        - Preserves positions of already-assigned tracks.
        - Appends new tracks after the current max position.
        - Removes tracks no longer in track_ids and re-compacts positions.
        """
        conn = self._connect()
        existing = conn.execute(
            "SELECT track_id, position FROM show_tracks WHERE show_id = ? ORDER BY position",
            (show_id,),
        ).fetchall()
        existing_map = {row["track_id"]: row["position"] for row in existing}

        track_id_set = set(track_ids)

        # Remove tracks that are no longer liked
        removed = [tid for tid in existing_map if tid not in track_id_set]
        for tid in removed:
            conn.execute(
                "DELETE FROM show_tracks WHERE show_id = ? AND track_id = ?",
                (show_id, tid),
            )

        # Add new tracks at the end
        max_pos = max(existing_map.values()) if existing_map else 0
        for track_id in track_ids:
            if track_id not in existing_map:
                max_pos += 1
                conn.execute(
                    "INSERT OR IGNORE INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)",
                    (show_id, track_id, max_pos),
                )

        # Re-compact positions if anything was removed
        if removed:
            rows = conn.execute(
                "SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position",
                (show_id,),
            ).fetchall()
            for i, row in enumerate(rows, start=1):
                conn.execute(
                    "UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
                    (i, show_id, row["track_id"]),
                )

        conn.commit()
        conn.close()

    def get_max_position(self, show_id: int) -> int:
        conn = self._connect()
        row = conn.execute(
            "SELECT COALESCE(MAX(position), 0) as max_pos FROM show_tracks WHERE show_id = ?",
            (show_id,),
        ).fetchone()
        conn.close()
        return row["max_pos"]

    def list_shows(self, limit: int = 20, offset: int = 0) -> list[Show]:
        conn = self._connect()
        rows = conn.execute(
            "SELECT * FROM shows ORDER BY week_start DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        conn.close()
        return [
            Show(
                id=row["id"],
                week_start=datetime.fromisoformat(row["week_start"]),
                week_end=datetime.fromisoformat(row["week_end"]),
                created_at=datetime.fromisoformat(row["created_at"]),
            )
            for row in rows
        ]

    def has_track_in_show(self, show_id: int, track_id: int) -> bool:
        conn = self._connect()
        row = conn.execute(
            "SELECT 1 FROM show_tracks WHERE show_id = ? AND track_id = ?",
            (show_id, track_id),
        ).fetchone()
        conn.close()
        return row is not None

    def remove_show_track(self, show_id: int, track_id: int) -> None:
        conn = self._connect()
        conn.execute(
            "DELETE FROM show_tracks WHERE show_id = ? AND track_id = ?",
            (show_id, track_id),
        )
        # Re-compact positions
        rows = conn.execute(
            "SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position",
            (show_id,),
        ).fetchall()
        for i, row in enumerate(rows, start=1):
            conn.execute(
                "UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
                (i, show_id, row["track_id"]),
            )
        conn.commit()
        conn.close()

    def move_show_track(self, show_id: int, track_id: int, new_position: int) -> None:
        conn = self._connect()
        # Get current ordered list
        rows = conn.execute(
            "SELECT track_id FROM show_tracks WHERE show_id = ? ORDER BY position",
            (show_id,),
        ).fetchall()
        ordered = [row["track_id"] for row in rows]

        # Remove and reinsert at new position
        ordered.remove(track_id)
        ordered.insert(new_position - 1, track_id)

        for i, tid in enumerate(ordered, start=1):
            conn.execute(
                "UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
                (i, show_id, tid),
            )
        conn.commit()
        conn.close()

    def add_track_to_show(self, show_id: int, track_id: int, position: int | None = None) -> None:
        conn = self._connect()
        if position is None:
            max_pos = self.get_max_position(show_id)
            conn.execute(
                "INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)",
                (show_id, track_id, max_pos + 1),
            )
        else:
            # Shift existing tracks at >= position up by 1
            conn.execute(
                "UPDATE show_tracks SET position = position + 1 WHERE show_id = ? AND position >= ?",
                (show_id, position),
            )
            conn.execute(
                "INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)",
                (show_id, track_id, position),
            )
        conn.commit()
        conn.close()

Note: from datetime import datetime, timezone and from ntr_fetcher.models import Track, Show, ShowTrack are already imported at the top of db.py.

Step 4: Run tests to verify they pass

Run: pytest tests/test_db.py -v Expected: All tests pass.

Step 5: Commit

git add src/ntr_fetcher/db.py tests/test_db.py
git commit -m "feat: add database query methods for tracks, shows, and show_tracks"

Task 7: SoundCloud Client — client_id Extraction

Files:

  • Create: src/ntr_fetcher/soundcloud.py
  • Create: tests/test_soundcloud.py

Step 1: Write the failing test

# tests/test_soundcloud.py
import pytest
import httpx

from ntr_fetcher.soundcloud import SoundCloudClient


FAKE_HTML = """
<html><head><script>
window.__sc_hydration = [
  {"hydratable": "user", "data": {}},
  {"hydratable": "apiClient", "data": {"id": "test_client_id_abc123", "isExpiring": false}}
];
</script></head></html>
"""

FAKE_HTML_EXPIRING = """
<html><head><script>
window.__sc_hydration = [
  {"hydratable": "apiClient", "data": {"id": "expiring_id_xyz", "isExpiring": true}}
];
</script></head></html>
"""


@pytest.mark.asyncio
async def test_extract_client_id(httpx_mock):
    httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
    client = SoundCloudClient()
    client_id = await client._extract_client_id()
    assert client_id == "test_client_id_abc123"


@pytest.mark.asyncio
async def test_extract_client_id_caches(httpx_mock):
    httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
    client = SoundCloudClient()
    id1 = await client._extract_client_id()
    id2 = await client._extract_client_id()
    assert id1 == id2
    assert len(httpx_mock.get_requests()) == 1  # Only fetched once


@pytest.mark.asyncio
async def test_extract_client_id_bad_html(httpx_mock):
    httpx_mock.add_response(url="https://soundcloud.com", text="<html>no hydration here</html>")
    client = SoundCloudClient()
    with pytest.raises(ValueError, match="client_id"):
        await client._extract_client_id()

Step 2: Run test to verify it fails

Run: pytest tests/test_soundcloud.py -v Expected: FAIL — ModuleNotFoundError

Step 3: Write the implementation

# src/ntr_fetcher/soundcloud.py
import json
import logging
import re

import httpx

logger = logging.getLogger(__name__)

SOUNDCLOUD_BASE = "https://soundcloud.com"
API_BASE = "https://api-v2.soundcloud.com"
HYDRATION_PATTERN = re.compile(r"__sc_hydration\s*=\s*(\[.*?\])\s*;", re.DOTALL)


class SoundCloudClient:
    def __init__(self, http_client: httpx.AsyncClient | None = None):
        self._http = http_client or httpx.AsyncClient(timeout=15.0)
        self._client_id: str | None = None

    async def _extract_client_id(self) -> str:
        if self._client_id is not None:
            return self._client_id

        resp = await self._http.get(SOUNDCLOUD_BASE)
        resp.raise_for_status()
        match = HYDRATION_PATTERN.search(resp.text)
        if not match:
            raise ValueError("Could not find __sc_hydration in SoundCloud HTML — cannot extract client_id")

        hydration = json.loads(match.group(1))
        for entry in hydration:
            if entry.get("hydratable") == "apiClient":
                self._client_id = entry["data"]["id"]
                is_expiring = entry["data"].get("isExpiring", False)
                if is_expiring:
                    logger.warning("SoundCloud client_id is marked as expiring")
                return self._client_id

        raise ValueError("No apiClient entry in __sc_hydration — cannot extract client_id")

    def invalidate_client_id(self) -> None:
        self._client_id = None

    async def close(self) -> None:
        await self._http.aclose()

Step 4: Run test to verify it passes

Run: pytest tests/test_soundcloud.py -v Expected: 3 passed.

Step 5: Commit

git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py
git commit -m "feat: add SoundCloud client with client_id extraction"

Task 8: SoundCloud Client — Resolve User & Fetch Likes

Files:

  • Modify: src/ntr_fetcher/soundcloud.py
  • Modify: tests/test_soundcloud.py

Step 1: Write the failing tests

Append to tests/test_soundcloud.py:

import json as json_mod
from datetime import datetime, timezone

from ntr_fetcher.models import Track


FAKE_RESOLVE_RESPONSE = {"id": 206979918, "kind": "user", "username": "NICKtheRAT"}

FAKE_LIKES_RESPONSE = {
    "collection": [
        {
            "created_at": "2026-03-09T02:25:43Z",
            "kind": "like",
            "track": {
                "id": 12345,
                "title": "Test Track",
                "permalink_url": "https://soundcloud.com/artist/test-track",
                "duration": 180000,
                "full_duration": 180000,
                "genre": "Electronic",
                "tag_list": "",
                "created_at": "2026-03-01T00:00:00Z",
                "description": "",
                "artwork_url": "https://i1.sndcdn.com/artworks-abc-large.jpg",
                "license": "cc-by",
                "user": {
                    "id": 999,
                    "username": "TestArtist",
                    "permalink_url": "https://soundcloud.com/testartist",
                },
                "media": {"transcodings": []},
            },
        }
    ],
    "next_href": None,
}


@pytest.mark.asyncio
async def test_resolve_user(httpx_mock):
    httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
    httpx_mock.add_response(
        url=re.compile(r"https://api-v2\.soundcloud\.com/resolve.*"),
        json=FAKE_RESOLVE_RESPONSE,
    )
    client = SoundCloudClient()
    user_id = await client.resolve_user("nicktherat")
    assert user_id == 206979918


@pytest.mark.asyncio
async def test_fetch_likes(httpx_mock):
    httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
    httpx_mock.add_response(
        url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
        json=FAKE_LIKES_RESPONSE,
    )
    client = SoundCloudClient()
    tracks = await client.fetch_likes(
        user_id=206979918,
        since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc),
        until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc),
    )
    assert len(tracks) == 1
    assert tracks[0].title == "Test Track"
    assert tracks[0].artist == "TestArtist"
    assert tracks[0].id == 12345


@pytest.mark.asyncio
async def test_fetch_likes_filters_outside_range(httpx_mock):
    httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
    httpx_mock.add_response(
        url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
        json=FAKE_LIKES_RESPONSE,
    )
    client = SoundCloudClient()
    # Range that excludes the track (liked at 2026-03-09)
    tracks = await client.fetch_likes(
        user_id=206979918,
        since=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc),
        until=datetime(2026, 3, 12, 0, 0, 0, tzinfo=timezone.utc),
    )
    assert len(tracks) == 0


@pytest.mark.asyncio
async def test_fetch_likes_retries_on_401(httpx_mock):
    """On 401, client_id is refreshed and the request is retried."""
    httpx_mock.add_response(url="https://soundcloud.com", text=FAKE_HTML)
    # First likes call returns 401
    httpx_mock.add_response(
        url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
        status_code=401,
    )
    # Re-extraction gets new client_id
    httpx_mock.add_response(
        url="https://soundcloud.com",
        text=FAKE_HTML.replace("test_client_id_abc123", "new_client_id_456"),
    )
    # Retry succeeds
    httpx_mock.add_response(
        url=re.compile(r"https://api-v2\.soundcloud\.com/users/206979918/likes.*"),
        json=FAKE_LIKES_RESPONSE,
    )
    client = SoundCloudClient()
    tracks = await client.fetch_likes(
        user_id=206979918,
        since=datetime(2026, 3, 1, 0, 0, 0, tzinfo=timezone.utc),
        until=datetime(2026, 3, 10, 0, 0, 0, tzinfo=timezone.utc),
    )
    assert len(tracks) == 1

Step 2: Run test to verify it fails

Run: pytest tests/test_soundcloud.py -v Expected: FAIL — AttributeError: 'SoundCloudClient' object has no attribute 'resolve_user'

Step 3: Write the implementation

Add to SoundCloudClient in src/ntr_fetcher/soundcloud.py:

    async def _api_get(self, url: str, params: dict | None = None) -> httpx.Response:
        """Make an API GET request with automatic client_id injection and 401 retry."""
        client_id = await self._extract_client_id()
        params = dict(params or {})
        params["client_id"] = client_id

        for attempt in range(3):
            resp = await self._http.get(url, params=params)
            if resp.status_code == 401:
                logger.warning("Got 401 from SoundCloud API, refreshing client_id (attempt %d)", attempt + 1)
                self.invalidate_client_id()
                client_id = await self._extract_client_id()
                params["client_id"] = client_id
                continue
            resp.raise_for_status()
            return resp

        raise httpx.HTTPStatusError(
            "Failed after 3 attempts (401)",
            request=resp.request,
            response=resp,
        )

    async def resolve_user(self, username: str) -> int:
        resp = await self._api_get(
            f"{API_BASE}/resolve",
            params={"url": f"{SOUNDCLOUD_BASE}/{username}"},
        )
        return resp.json()["id"]

    async def fetch_likes(
        self,
        user_id: int,
        since: datetime,
        until: datetime,
        limit: int = 50,
    ) -> list["Track"]:
        from ntr_fetcher.models import Track

        cursor = _build_cursor(until, user_id)
        collected: list[Track] = []

        while True:
            params = {"limit": limit}
            if cursor:
                params["offset"] = cursor

            resp = await self._api_get(f"{API_BASE}/users/{user_id}/likes", params=params)
            data = resp.json()
            collection = data.get("collection", [])

            if not collection:
                break

            stop = False
            for item in collection:
                liked_at_str = item.get("created_at", "")
                liked_at = datetime.fromisoformat(liked_at_str.replace("Z", "+00:00"))

                if liked_at < since:
                    stop = True
                    break

                if liked_at > until:
                    continue

                track_data = item.get("track")
                if track_data is None:
                    continue

                user_data = track_data.get("user", {})
                collected.append(
                    Track(
                        id=track_data["id"],
                        title=track_data["title"],
                        artist=user_data.get("username", "Unknown"),
                        permalink_url=track_data["permalink_url"],
                        artwork_url=track_data.get("artwork_url"),
                        duration_ms=track_data.get("full_duration", track_data.get("duration", 0)),
                        license=track_data.get("license", ""),
                        liked_at=liked_at,
                        raw_json=json.dumps(track_data),
                    )
                )

            if stop:
                break

            next_href = data.get("next_href")
            if not next_href:
                break

            # Extract cursor from next_href
            from urllib.parse import urlparse, parse_qs
            parsed = urlparse(next_href)
            qs = parse_qs(parsed.query)
            cursor = qs.get("offset", [None])[0]
            if cursor is None:
                break

        # Return in chronological order (oldest first)
        collected.sort(key=lambda t: t.liked_at)
        return collected

Also add this module-level helper and the missing import at the top of the file:

from datetime import datetime
from urllib.parse import quote


def _build_cursor(until: datetime, user_id: int) -> str:
    ts = until.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    padded_user = str(user_id).zfill(22)
    raw = f"{ts},user-track-likes,000-{padded_user}-99999999999999999999"
    return raw

Step 4: Run test to verify it passes

Run: pytest tests/test_soundcloud.py -v Expected: All tests pass.

Step 5: Commit

git add src/ntr_fetcher/soundcloud.py tests/test_soundcloud.py
git commit -m "feat: add user resolution and likes fetching with 401 retry"

Task 9: Poller & Supervisor

Files:

  • Create: src/ntr_fetcher/poller.py
  • Create: tests/test_poller.py

Step 1: Write the failing test

# tests/test_poller.py
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from ntr_fetcher.poller import Poller
from ntr_fetcher.models import Track


def _make_track(id: int, liked_at: str) -> Track:
    return Track(
        id=id,
        title=f"Track {id}",
        artist="Artist",
        permalink_url=f"https://soundcloud.com/a/t-{id}",
        artwork_url=None,
        duration_ms=180000,
        license="cc-by",
        liked_at=datetime.fromisoformat(liked_at),
        raw_json="{}",
    )


@pytest.mark.asyncio
async def test_poll_once_fetches_and_stores():
    mock_sc = AsyncMock()
    mock_sc.resolve_user.return_value = 206979918
    mock_sc.fetch_likes.return_value = [
        _make_track(1, "2026-03-14T01:00:00+00:00"),
        _make_track(2, "2026-03-14T02:00:00+00:00"),
    ]

    mock_db = MagicMock()
    mock_show = MagicMock()
    mock_show.id = 1
    mock_db.get_or_create_show.return_value = mock_show

    poller = Poller(
        db=mock_db,
        soundcloud=mock_sc,
        soundcloud_user="nicktherat",
        show_day=2,
        show_hour=22,
        poll_interval=3600,
    )

    await poller.poll_once()

    assert mock_sc.resolve_user.called
    assert mock_sc.fetch_likes.called
    assert mock_db.upsert_track.call_count == 2
    assert mock_db.set_show_tracks.called
    call_args = mock_db.set_show_tracks.call_args
    assert call_args[0][0] == 1  # show_id
    assert call_args[0][1] == [1, 2]  # track_ids in order


@pytest.mark.asyncio
async def test_poll_once_removes_unliked_tracks():
    """When a track disappears from likes, it should be removed from the show."""
    mock_sc = AsyncMock()
    mock_sc.resolve_user.return_value = 206979918
    # First poll: two tracks
    mock_sc.fetch_likes.return_value = [
        _make_track(1, "2026-03-14T01:00:00+00:00"),
        _make_track(2, "2026-03-14T02:00:00+00:00"),
    ]

    mock_db = MagicMock()
    mock_show = MagicMock()
    mock_show.id = 1
    mock_db.get_or_create_show.return_value = mock_show

    poller = Poller(
        db=mock_db,
        soundcloud=mock_sc,
        soundcloud_user="nicktherat",
        show_day=2,
        show_hour=22,
        poll_interval=3600,
    )

    await poller.poll_once()

    # Second poll: Nick unliked track 2
    mock_sc.fetch_likes.return_value = [
        _make_track(1, "2026-03-14T01:00:00+00:00"),
    ]
    mock_db.reset_mock()
    mock_db.get_or_create_show.return_value = mock_show

    await poller.poll_once()

    call_args = mock_db.set_show_tracks.call_args
    assert call_args[0][1] == [1]  # only track 1 remains


@pytest.mark.asyncio
async def test_poll_once_full_refresh():
    mock_sc = AsyncMock()
    mock_sc.resolve_user.return_value = 206979918
    mock_sc.fetch_likes.return_value = [
        _make_track(1, "2026-03-14T01:00:00+00:00"),
    ]

    mock_db = MagicMock()
    mock_show = MagicMock()
    mock_show.id = 1
    mock_db.get_or_create_show.return_value = mock_show

    poller = Poller(
        db=mock_db,
        soundcloud=mock_sc,
        soundcloud_user="nicktherat",
        show_day=2,
        show_hour=22,
        poll_interval=3600,
    )

    await poller.poll_once(full=True)

    # full=True should still call fetch_likes
    assert mock_sc.fetch_likes.called


@pytest.mark.asyncio
async def test_supervisor_restarts_poller_on_failure():
    call_count = 0

    async def failing_poll():
        nonlocal call_count
        call_count += 1
        if call_count <= 2:
            raise RuntimeError("Simulated failure")

    mock_sc = AsyncMock()
    mock_db = MagicMock()

    poller = Poller(
        db=mock_db,
        soundcloud=mock_sc,
        soundcloud_user="nicktherat",
        show_day=2,
        show_hour=22,
        poll_interval=0.01,  # Very short for testing
    )
    poller.poll_once = failing_poll

    task = asyncio.create_task(poller.run_supervised(restart_delay=0.01))
    await asyncio.sleep(0.2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

    assert call_count >= 3  # Failed twice, succeeded at least once

Step 2: Run test to verify it fails

Run: pytest tests/test_poller.py -v Expected: FAIL — ModuleNotFoundError

Step 3: Write the implementation

# src/ntr_fetcher/poller.py
import asyncio
import logging
from datetime import datetime, timezone

from ntr_fetcher.db import Database
from ntr_fetcher.soundcloud import SoundCloudClient
from ntr_fetcher.week import get_show_week

logger = logging.getLogger(__name__)


class Poller:
    def __init__(
        self,
        db: Database,
        soundcloud: SoundCloudClient,
        soundcloud_user: str,
        show_day: int,
        show_hour: int,
        poll_interval: float,
    ):
        self._db = db
        self._sc = soundcloud
        self._user = soundcloud_user
        self._show_day = show_day
        self._show_hour = show_hour
        self._poll_interval = poll_interval
        self._user_id: int | None = None
        self.last_fetch: datetime | None = None
        self.alive = True

    async def _get_user_id(self) -> int:
        if self._user_id is None:
            self._user_id = await self._sc.resolve_user(self._user)
        return self._user_id

    async def poll_once(self, full: bool = False) -> None:
        user_id = await self._get_user_id()
        now = datetime.now(timezone.utc)
        week_start, week_end = get_show_week(now, self._show_day, self._show_hour)
        show = self._db.get_or_create_show(week_start, week_end)

        tracks = await self._sc.fetch_likes(
            user_id=user_id,
            since=week_start,
            until=week_end,
        )

        for track in tracks:
            self._db.upsert_track(track)

        track_ids = [t.id for t in tracks]
        self._db.set_show_tracks(show.id, track_ids)
        self.last_fetch = datetime.now(timezone.utc)
        logger.info("Fetched %d tracks for show %d", len(tracks), show.id)

    async def run_supervised(self, restart_delay: float = 30.0) -> None:
        while True:
            try:
                self.alive = True
                await self.poll_once()
                await asyncio.sleep(self._poll_interval)
            except asyncio.CancelledError:
                self.alive = False
                raise
            except Exception:
                self.alive = False
                logger.exception("Poller failed, restarting in %.1fs", restart_delay)
                await asyncio.sleep(restart_delay)

Step 4: Run test to verify it passes

Run: pytest tests/test_poller.py -v Expected: 3 passed.

Step 5: Commit

git add src/ntr_fetcher/poller.py tests/test_poller.py
git commit -m "feat: add poller with supervised restart loop"

Task 10: API Routes

Files:

  • Create: src/ntr_fetcher/api.py
  • Create: tests/test_api.py

Step 1: Write the failing tests

# tests/test_api.py
from datetime import datetime, timezone

import pytest
from fastapi.testclient import TestClient

from ntr_fetcher.api import create_app
from ntr_fetcher.db import Database
from ntr_fetcher.models import Track


@pytest.fixture
def db(tmp_path):
    database = Database(str(tmp_path / "test.db"))
    database.initialize()
    return database


@pytest.fixture
def app(db):
    from unittest.mock import AsyncMock, MagicMock
    poller = MagicMock()
    poller.last_fetch = datetime(2026, 3, 12, 12, 0, 0, tzinfo=timezone.utc)
    poller.alive = True
    poller.poll_once = AsyncMock()
    return create_app(db=db, poller=poller, admin_token="test-token")


@pytest.fixture
def client(app):
    return TestClient(app)


def _seed_show(db):
    week_start = datetime(2026, 3, 13, 2, 0, 0, tzinfo=timezone.utc)
    week_end = datetime(2026, 3, 20, 2, 0, 0, tzinfo=timezone.utc)
    show = db.get_or_create_show(week_start, week_end)
    t1 = Track(1, "Song A", "Artist A", "https://soundcloud.com/a/1", None, 180000, "cc-by",
               datetime(2026, 3, 14, 1, 0, 0, tzinfo=timezone.utc), "{}")
    t2 = Track(2, "Song B", "Artist B", "https://soundcloud.com/b/2", None, 200000, "cc-by-sa",
               datetime(2026, 3, 14, 2, 0, 0, tzinfo=timezone.utc), "{}")
    db.upsert_track(t1)
    db.upsert_track(t2)
    db.set_show_tracks(show.id, [t1.id, t2.id])
    return show


def test_health(client):
    resp = client.get("/health")
    assert resp.status_code == 200
    data = resp.json()
    assert data["status"] == "ok"
    assert data["poller_alive"] is True


def test_playlist(client, db):
    _seed_show(db)
    resp = client.get("/playlist")
    assert resp.status_code == 200
    data = resp.json()
    assert len(data["tracks"]) == 2
    assert data["tracks"][0]["position"] == 1
    assert data["tracks"][0]["title"] == "Song A"


def test_playlist_by_position(client, db):
    _seed_show(db)
    resp = client.get("/playlist/2")
    assert resp.status_code == 200
    assert resp.json()["title"] == "Song B"


def test_playlist_by_position_not_found(client, db):
    _seed_show(db)
    resp = client.get("/playlist/99")
    assert resp.status_code == 404


def test_shows_list(client, db):
    _seed_show(db)
    resp = client.get("/shows")
    assert resp.status_code == 200
    assert len(resp.json()) >= 1


def test_shows_detail(client, db):
    show = _seed_show(db)
    resp = client.get(f"/shows/{show.id}")
    assert resp.status_code == 200
    assert len(resp.json()["tracks"]) == 2


def test_admin_refresh_requires_token(client):
    resp = client.post("/admin/refresh")
    assert resp.status_code == 401


def test_admin_refresh_with_token(client):
    resp = client.post(
        "/admin/refresh",
        headers={"Authorization": "Bearer test-token"},
        json={"full": False},
    )
    assert resp.status_code == 200


def test_admin_remove_track(client, db):
    show = _seed_show(db)
    resp = client.delete(
        "/admin/tracks/1",
        headers={"Authorization": "Bearer test-token"},
    )
    assert resp.status_code == 200
    tracks = db.get_show_tracks(show.id)
    assert len(tracks) == 1

Step 2: Run test to verify it fails

Run: pytest tests/test_api.py -v Expected: FAIL — ModuleNotFoundError

Step 3: Write the implementation

# src/ntr_fetcher/api.py
import logging
from datetime import datetime, timezone

from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel

from ntr_fetcher.db import Database
from ntr_fetcher.week import get_show_week

logger = logging.getLogger(__name__)


class RefreshRequest(BaseModel):
    full: bool = False


class AddTrackRequest(BaseModel):
    soundcloud_url: str | None = None
    track_id: int | None = None
    position: int | None = None


class MoveTrackRequest(BaseModel):
    position: int


def create_app(db: Database, poller, admin_token: str) -> FastAPI:
    app = FastAPI(title="NtR SoundCloud Fetcher")

    def _require_admin(authorization: str = Header(None)):
        if authorization is None or not authorization.startswith("Bearer "):
            raise HTTPException(status_code=401, detail="Missing or invalid token")
        token = authorization.removeprefix("Bearer ")
        if token != admin_token:
            raise HTTPException(status_code=401, detail="Invalid token")

    def _current_show():
        now = datetime.now(timezone.utc)
        week_start, week_end = get_show_week(now, show_day=2, show_hour=22)
        return db.get_or_create_show(week_start, week_end)

    @app.get("/health")
    def health():
        show = _current_show()
        tracks = db.get_show_tracks(show.id)
        return {
            "status": "ok",
            "poller_alive": poller.alive,
            "last_fetch": poller.last_fetch.isoformat() if poller.last_fetch else None,
            "current_week_track_count": len(tracks),
        }

    @app.get("/playlist")
    def playlist():
        show = _current_show()
        tracks = db.get_show_tracks(show.id)
        return {
            "show_id": show.id,
            "week_start": show.week_start.isoformat(),
            "week_end": show.week_end.isoformat(),
            "tracks": tracks,
        }

    @app.get("/playlist/{position}")
    def playlist_track(position: int):
        show = _current_show()
        track = db.get_show_track_by_position(show.id, position)
        if track is None:
            raise HTTPException(status_code=404, detail=f"No track at position {position}")
        return track

    @app.get("/shows")
    def list_shows(limit: int = 20, offset: int = 0):
        shows = db.list_shows(limit=limit, offset=offset)
        return [
            {
                "id": s.id,
                "week_start": s.week_start.isoformat(),
                "week_end": s.week_end.isoformat(),
                "created_at": s.created_at.isoformat(),
            }
            for s in shows
        ]

    @app.get("/shows/{show_id}")
    def show_detail(show_id: int):
        shows = db.list_shows(limit=1000, offset=0)
        show = next((s for s in shows if s.id == show_id), None)
        if show is None:
            raise HTTPException(status_code=404, detail="Show not found")
        tracks = db.get_show_tracks(show.id)
        return {
            "show_id": show.id,
            "week_start": show.week_start.isoformat(),
            "week_end": show.week_end.isoformat(),
            "tracks": tracks,
        }

    @app.post("/admin/refresh")
    async def admin_refresh(body: RefreshRequest = RefreshRequest(), _=Depends(_require_admin)):
        await poller.poll_once(full=body.full)
        show = _current_show()
        tracks = db.get_show_tracks(show.id)
        return {"status": "refreshed", "track_count": len(tracks)}

    @app.post("/admin/tracks")
    def admin_add_track(body: AddTrackRequest, _=Depends(_require_admin)):
        show = _current_show()
        if body.track_id is not None:
            db.add_track_to_show(show.id, body.track_id, body.position)
            return {"status": "added"}
        raise HTTPException(status_code=400, detail="Provide track_id")

    @app.delete("/admin/tracks/{track_id}")
    def admin_remove_track(track_id: int, _=Depends(_require_admin)):
        show = _current_show()
        if not db.has_track_in_show(show.id, track_id):
            raise HTTPException(status_code=404, detail="Track not in current show")
        db.remove_show_track(show.id, track_id)
        return {"status": "removed"}

    @app.put("/admin/tracks/{track_id}/position")
    def admin_move_track(track_id: int, body: MoveTrackRequest, _=Depends(_require_admin)):
        show = _current_show()
        if not db.has_track_in_show(show.id, track_id):
            raise HTTPException(status_code=404, detail="Track not in current show")
        db.move_show_track(show.id, track_id, body.position)
        return {"status": "moved"}

    return app

Step 4: Run test to verify it passes

Run: pytest tests/test_api.py -v Expected: All tests pass.

Step 5: Commit

git add src/ntr_fetcher/api.py tests/test_api.py
git commit -m "feat: add FastAPI routes for playlist, shows, admin, and health"

Task 11: Main Entry Point

Files:

  • Create: src/ntr_fetcher/main.py

Step 1: Write the entry point

# src/ntr_fetcher/main.py
import asyncio
import logging

import uvicorn

from ntr_fetcher.api import create_app
from ntr_fetcher.config import Settings
from ntr_fetcher.db import Database
from ntr_fetcher.poller import Poller
from ntr_fetcher.soundcloud import SoundCloudClient

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)


def run() -> None:
    settings = Settings()

    db = Database(settings.db_path)
    db.initialize()
    logger.info("Database initialized at %s", settings.db_path)

    sc = SoundCloudClient()
    poller = Poller(
        db=db,
        soundcloud=sc,
        soundcloud_user=settings.soundcloud_user,
        show_day=settings.show_day,
        show_hour=settings.show_hour,
        poll_interval=settings.poll_interval_seconds,
    )

    app = create_app(db=db, poller=poller, admin_token=settings.admin_token)

    @app.on_event("startup")
    async def start_poller():
        logger.info("Starting poller (interval=%ds)", settings.poll_interval_seconds)
        asyncio.create_task(poller.run_supervised())

    @app.on_event("shutdown")
    async def shutdown():
        logger.info("Shutting down")
        await sc.close()

    uvicorn.run(app, host=settings.host, port=settings.port)


if __name__ == "__main__":
    run()

Step 2: Verify it at least imports

Run: python -c "from ntr_fetcher.main import run; print('OK')" Expected: OK

Step 3: Commit

git add src/ntr_fetcher/main.py
git commit -m "feat: add main entry point wiring API server and poller"

Task 12: README

Files:

  • Create: README.md

Step 1: Write the README

# NtR SoundCloud Fetcher

Fetches SoundCloud likes from NicktheRat's profile, builds weekly playlists
aligned to the Wednesday 22:00 ET show schedule, and serves them via a JSON
API.

## Quick Start

```bash
pip install -e ".[dev]"
export NTR_ADMIN_TOKEN="your-secret-here"
ntr-fetcher

The API starts at http://127.0.0.1:8000.

API

Endpoint Description
GET /playlist Current week's playlist
GET /playlist/{n} Track at position n
GET /shows List all shows
GET /shows/{id} Specific show's playlist
GET /health Service health check
POST /admin/refresh Trigger SoundCloud fetch (token required)

Configuration

Environment variables (prefix NTR_):

Variable Default Description
NTR_PORT 8000 API port
NTR_HOST 127.0.0.1 Bind address
NTR_DB_PATH ./ntr_fetcher.db SQLite path
NTR_POLL_INTERVAL_SECONDS 3600 Poll frequency
NTR_ADMIN_TOKEN (required) Admin bearer token
NTR_SOUNDCLOUD_USER nicktherat SoundCloud user

Development

pip install -e ".[dev]"
pytest

**Step 2: Commit**

```bash
git add README.md
git commit -m "docs: add README with quick start and API reference"

Task 13: Run Full Test Suite

Step 1: Run all tests

Run: pytest -v Expected: All tests pass.

Step 2: Run ruff linter

Run: ruff check src/ tests/ Expected: No errors (or fix any that appear).

Step 3: Final commit if any fixes were needed

git add -A
git commit -m "fix: address linting issues"