From e90f44439bd3e1962175ecc677e6be8768804f83 Mon Sep 17 00:00:00 2001 From: cottongin Date: Thu, 12 Mar 2026 07:17:20 -0400 Subject: [PATCH] Add dashboard router with session auth, announce endpoint, and WebSocket handler - Login/logout/dashboard with HMAC-signed session cookies - POST /admin/announce with session or bearer auth - WS /ws/announce for subscribe/broadcast - Static stubs: login.html, dashboard.html Made-with: Cursor --- src/ntr_fetcher/dashboard.py | 147 +++++++++++++++++++++ src/ntr_fetcher/static/dashboard.html | 5 + src/ntr_fetcher/static/login.html | 10 ++ tests/test_dashboard.py | 178 ++++++++++++++++++++++++++ 4 files changed, 340 insertions(+) create mode 100644 src/ntr_fetcher/dashboard.py create mode 100644 src/ntr_fetcher/static/dashboard.html create mode 100644 src/ntr_fetcher/static/login.html create mode 100644 tests/test_dashboard.py diff --git a/src/ntr_fetcher/dashboard.py b/src/ntr_fetcher/dashboard.py new file mode 100644 index 0000000..3cc3bbd --- /dev/null +++ b/src/ntr_fetcher/dashboard.py @@ -0,0 +1,147 @@ +import hashlib +import hmac +import json +import logging +import time +from pathlib import Path + +from fastapi import APIRouter, Request, HTTPException, Form, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse, RedirectResponse +from pydantic import BaseModel + +from ntr_fetcher.db import Database +from ntr_fetcher.websocket import AnnounceManager + +logger = logging.getLogger(__name__) + +STATIC_DIR = Path(__file__).parent / "static" +SESSION_MAX_AGE = 86400 + + +def _sign(data: str, secret: str) -> str: + sig = hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest() + return f"{data}.{sig}" + + +def _unsign(signed: str, secret: str, max_age: int = SESSION_MAX_AGE) -> str | None: + if "." not in signed: + return None + data, sig = signed.rsplit(".", 1) + expected = hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest() + if not hmac.compare_digest(sig, expected): + return None + try: + payload = json.loads(data) + if time.time() - payload.get("t", 0) > max_age: + return None + return payload.get("user") + except (json.JSONDecodeError, KeyError): + return None + + +class AnnounceRequest(BaseModel): + show_id: int + position: int + + +def create_dashboard_router( + db: Database, + manager: AnnounceManager, + admin_token: str, + web_user: str, + web_password: str, + secret_key: str, + show_day: int = 2, + show_hour: int = 22, +) -> APIRouter: + router = APIRouter() + + def _get_session_user(request: Request) -> str | None: + cookie = request.cookies.get("ntr_session") + if not cookie: + return None + return _unsign(cookie, secret_key) + + @router.get("/login", response_class=HTMLResponse) + def login_page(): + html = (STATIC_DIR / "login.html").read_text() + return HTMLResponse(html) + + @router.post("/login") + def login_submit(username: str = Form(...), password: str = Form(...)): + if username == web_user and password == web_password: + payload = json.dumps({"user": username, "t": int(time.time())}) + cookie_value = _sign(payload, secret_key) + response = RedirectResponse(url="/dashboard", status_code=303) + response.set_cookie( + "ntr_session", + cookie_value, + max_age=SESSION_MAX_AGE, + httponly=True, + samesite="lax", + ) + return response + html = (STATIC_DIR / "login.html").read_text() + html = html.replace("", '

Invalid username or password

') + return HTMLResponse(html, status_code=200) + + @router.get("/logout") + def logout(): + response = RedirectResponse(url="/login", status_code=303) + response.delete_cookie("ntr_session") + return response + + @router.get("/dashboard", response_class=HTMLResponse) + def dashboard(request: Request): + user = _get_session_user(request) + if user is None: + return RedirectResponse(url="/login", status_code=303) + html = (STATIC_DIR / "dashboard.html").read_text() + html = html.replace("{{WS_TOKEN}}", admin_token) + return HTMLResponse(html) + + @router.post("/admin/announce") + async def announce(body: AnnounceRequest, request: Request): + user = _get_session_user(request) + if user is None: + auth_header = request.headers.get("authorization", "") + if not auth_header.startswith("Bearer ") or auth_header.removeprefix("Bearer ") != admin_token: + raise HTTPException(status_code=401, detail="Unauthorized") + + track = db.get_show_track_by_position(body.show_id, body.position) + if track is None: + raise HTTPException(status_code=404, detail=f"No track at position {body.position}") + + message = ( + f"Now Playing: Song #{track['position']}: " + f"{track['title']} by {track['artist']} - {track['permalink_url']}" + ) + await manager.broadcast({"type": "announce", "message": message}) + return {"status": "announced", "message": message} + + @router.websocket("/ws/announce") + async def ws_announce(websocket: WebSocket): + await websocket.accept() + try: + auth_msg = await websocket.receive_json() + if auth_msg.get("type") != "subscribe" or auth_msg.get("token") != admin_token: + await websocket.close(code=4001, reason="Invalid token") + return + + manager.add_subscriber(websocket) + await manager.broadcast_status() + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + pass + except WebSocketDisconnect: + pass + finally: + manager.remove_subscriber(websocket) + try: + await manager.broadcast_status() + except Exception: + pass + + return router diff --git a/src/ntr_fetcher/static/dashboard.html b/src/ntr_fetcher/static/dashboard.html new file mode 100644 index 0000000..569c1df --- /dev/null +++ b/src/ntr_fetcher/static/dashboard.html @@ -0,0 +1,5 @@ + +Dashboard +

NtR Playlist Dashboard

+ + diff --git a/src/ntr_fetcher/static/login.html b/src/ntr_fetcher/static/login.html new file mode 100644 index 0000000..52b87e9 --- /dev/null +++ b/src/ntr_fetcher/static/login.html @@ -0,0 +1,10 @@ + +Login + + +
+ + + +
+ diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py new file mode 100644 index 0000000..9a999d5 --- /dev/null +++ b/tests/test_dashboard.py @@ -0,0 +1,178 @@ +from datetime import datetime, timezone + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from ntr_fetcher.dashboard import create_dashboard_router +from ntr_fetcher.db import Database +from ntr_fetcher.models import Track +from ntr_fetcher.websocket import AnnounceManager + + +@pytest.fixture +def db(tmp_path): + database = Database(str(tmp_path / "test.db")) + database.initialize() + return database + + +@pytest.fixture +def manager(): + return AnnounceManager() + + +@pytest.fixture +def app(db, manager): + a = FastAPI() + router = create_dashboard_router( + db=db, + manager=manager, + admin_token="test-token", + web_user="nick", + web_password="secret", + secret_key="test-secret-key", + show_day=2, + show_hour=22, + ) + a.include_router(router) + return a + + +@pytest.fixture +def client(app): + return TestClient(app) + + +def _seed_show(db): + week_start = datetime(2026, 3, 12, 2, 0, 0, tzinfo=timezone.utc) + week_end = datetime(2026, 3, 19, 2, 0, 0, tzinfo=timezone.utc) + show = db.get_or_create_show(week_start, week_end) + t1 = Track(1, "Song A", "Artist A", "https://soundcloud.com/a/1", None, 180000, "cc-by", + datetime(2026, 3, 14, 1, 0, 0, tzinfo=timezone.utc), "{}") + db.upsert_track(t1) + db.set_show_tracks(show.id, [t1.id]) + return show + + +# --- Session auth tests --- + +def test_dashboard_redirects_without_session(client): + resp = client.get("/dashboard", follow_redirects=False) + assert resp.status_code == 303 + assert "/login" in resp.headers["location"] + + +def test_login_page_renders(client): + resp = client.get("/login") + assert resp.status_code == 200 + assert "login" in resp.text.lower() + + +def test_login_with_valid_credentials(client): + resp = client.post( + "/login", + data={"username": "nick", "password": "secret"}, + follow_redirects=False, + ) + assert resp.status_code == 303 + assert "/dashboard" in resp.headers["location"] + assert "ntr_session" in resp.cookies + + +def test_login_with_invalid_credentials(client): + resp = client.post( + "/login", + data={"username": "nick", "password": "wrong"}, + follow_redirects=False, + ) + assert resp.status_code == 200 + assert "invalid" in resp.text.lower() or "incorrect" in resp.text.lower() + + +def test_dashboard_accessible_with_session(client): + client.post( + "/login", + data={"username": "nick", "password": "secret"}, + follow_redirects=False, + ) + resp = client.get("/dashboard") + assert resp.status_code == 200 + assert "dashboard" in resp.text.lower() + + +def test_logout_clears_session(client): + client.post( + "/login", + data={"username": "nick", "password": "secret"}, + follow_redirects=False, + ) + resp = client.get("/logout", follow_redirects=False) + assert resp.status_code == 303 + assert "/login" in resp.headers["location"] + + resp2 = client.get("/dashboard", follow_redirects=False) + assert resp2.status_code == 303 + + +# --- Announce endpoint tests --- + +def test_announce_with_session(client, db): + _seed_show(db) + client.post( + "/login", + data={"username": "nick", "password": "secret"}, + follow_redirects=False, + ) + resp = client.post("/admin/announce", json={"show_id": 1, "position": 1}) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "announced" + assert "Now Playing:" in data["message"] + assert "Song A" in data["message"] + assert "Artist A" in data["message"] + + +def test_announce_with_bearer(client, db): + _seed_show(db) + resp = client.post( + "/admin/announce", + json={"show_id": 1, "position": 1}, + headers={"Authorization": "Bearer test-token"}, + ) + assert resp.status_code == 200 + assert "Now Playing:" in resp.json()["message"] + + +def test_announce_without_auth(client, db): + _seed_show(db) + resp = client.post("/admin/announce", json={"show_id": 1, "position": 1}) + assert resp.status_code == 401 + + +def test_announce_invalid_position(client, db): + _seed_show(db) + resp = client.post( + "/admin/announce", + json={"show_id": 1, "position": 99}, + headers={"Authorization": "Bearer test-token"}, + ) + assert resp.status_code == 404 + + +# --- WebSocket tests --- + +def test_ws_subscribe_with_valid_token(app): + with TestClient(app) as c: + with c.websocket_connect("/ws/announce") as ws: + ws.send_json({"type": "subscribe", "token": "test-token"}) + data = ws.receive_json() + assert data["type"] == "status" + + +def test_ws_subscribe_with_invalid_token(app): + with TestClient(app) as c: + with c.websocket_connect("/ws/announce") as ws: + ws.send_json({"type": "subscribe", "token": "wrong"}) + with pytest.raises(Exception): + ws.receive_json()