Add dashboard router with session auth, announce endpoint, and WebSocket handler
- Login/logout/dashboard with HMAC-signed session cookies - POST /admin/announce with session or bearer auth - WS /ws/announce for subscribe/broadcast - Static stubs: login.html, dashboard.html Made-with: Cursor
This commit is contained in:
147
src/ntr_fetcher/dashboard.py
Normal file
147
src/ntr_fetcher/dashboard.py
Normal file
@@ -0,0 +1,147 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter, Request, HTTPException, Form, WebSocket, WebSocketDisconnect
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ntr_fetcher.db import Database
|
||||
from ntr_fetcher.websocket import AnnounceManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
STATIC_DIR = Path(__file__).parent / "static"
|
||||
SESSION_MAX_AGE = 86400
|
||||
|
||||
|
||||
def _sign(data: str, secret: str) -> str:
|
||||
sig = hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest()
|
||||
return f"{data}.{sig}"
|
||||
|
||||
|
||||
def _unsign(signed: str, secret: str, max_age: int = SESSION_MAX_AGE) -> str | None:
|
||||
if "." not in signed:
|
||||
return None
|
||||
data, sig = signed.rsplit(".", 1)
|
||||
expected = hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest()
|
||||
if not hmac.compare_digest(sig, expected):
|
||||
return None
|
||||
try:
|
||||
payload = json.loads(data)
|
||||
if time.time() - payload.get("t", 0) > max_age:
|
||||
return None
|
||||
return payload.get("user")
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
return None
|
||||
|
||||
|
||||
class AnnounceRequest(BaseModel):
|
||||
show_id: int
|
||||
position: int
|
||||
|
||||
|
||||
def create_dashboard_router(
|
||||
db: Database,
|
||||
manager: AnnounceManager,
|
||||
admin_token: str,
|
||||
web_user: str,
|
||||
web_password: str,
|
||||
secret_key: str,
|
||||
show_day: int = 2,
|
||||
show_hour: int = 22,
|
||||
) -> APIRouter:
|
||||
router = APIRouter()
|
||||
|
||||
def _get_session_user(request: Request) -> str | None:
|
||||
cookie = request.cookies.get("ntr_session")
|
||||
if not cookie:
|
||||
return None
|
||||
return _unsign(cookie, secret_key)
|
||||
|
||||
@router.get("/login", response_class=HTMLResponse)
|
||||
def login_page():
|
||||
html = (STATIC_DIR / "login.html").read_text()
|
||||
return HTMLResponse(html)
|
||||
|
||||
@router.post("/login")
|
||||
def login_submit(username: str = Form(...), password: str = Form(...)):
|
||||
if username == web_user and password == web_password:
|
||||
payload = json.dumps({"user": username, "t": int(time.time())})
|
||||
cookie_value = _sign(payload, secret_key)
|
||||
response = RedirectResponse(url="/dashboard", status_code=303)
|
||||
response.set_cookie(
|
||||
"ntr_session",
|
||||
cookie_value,
|
||||
max_age=SESSION_MAX_AGE,
|
||||
httponly=True,
|
||||
samesite="lax",
|
||||
)
|
||||
return response
|
||||
html = (STATIC_DIR / "login.html").read_text()
|
||||
html = html.replace("<!--ERROR-->", '<p class="error">Invalid username or password</p>')
|
||||
return HTMLResponse(html, status_code=200)
|
||||
|
||||
@router.get("/logout")
|
||||
def logout():
|
||||
response = RedirectResponse(url="/login", status_code=303)
|
||||
response.delete_cookie("ntr_session")
|
||||
return response
|
||||
|
||||
@router.get("/dashboard", response_class=HTMLResponse)
|
||||
def dashboard(request: Request):
|
||||
user = _get_session_user(request)
|
||||
if user is None:
|
||||
return RedirectResponse(url="/login", status_code=303)
|
||||
html = (STATIC_DIR / "dashboard.html").read_text()
|
||||
html = html.replace("{{WS_TOKEN}}", admin_token)
|
||||
return HTMLResponse(html)
|
||||
|
||||
@router.post("/admin/announce")
|
||||
async def announce(body: AnnounceRequest, request: Request):
|
||||
user = _get_session_user(request)
|
||||
if user is None:
|
||||
auth_header = request.headers.get("authorization", "")
|
||||
if not auth_header.startswith("Bearer ") or auth_header.removeprefix("Bearer ") != admin_token:
|
||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||
|
||||
track = db.get_show_track_by_position(body.show_id, body.position)
|
||||
if track is None:
|
||||
raise HTTPException(status_code=404, detail=f"No track at position {body.position}")
|
||||
|
||||
message = (
|
||||
f"Now Playing: Song #{track['position']}: "
|
||||
f"{track['title']} by {track['artist']} - {track['permalink_url']}"
|
||||
)
|
||||
await manager.broadcast({"type": "announce", "message": message})
|
||||
return {"status": "announced", "message": message}
|
||||
|
||||
@router.websocket("/ws/announce")
|
||||
async def ws_announce(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
try:
|
||||
auth_msg = await websocket.receive_json()
|
||||
if auth_msg.get("type") != "subscribe" or auth_msg.get("token") != admin_token:
|
||||
await websocket.close(code=4001, reason="Invalid token")
|
||||
return
|
||||
|
||||
manager.add_subscriber(websocket)
|
||||
await manager.broadcast_status()
|
||||
try:
|
||||
while True:
|
||||
await websocket.receive_text()
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
finally:
|
||||
manager.remove_subscriber(websocket)
|
||||
try:
|
||||
await manager.broadcast_status()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return router
|
||||
5
src/ntr_fetcher/static/dashboard.html
Normal file
5
src/ntr_fetcher/static/dashboard.html
Normal file
@@ -0,0 +1,5 @@
|
||||
<!DOCTYPE html>
|
||||
<html><head><title>Dashboard</title></head>
|
||||
<body><h1>NtR Playlist Dashboard</h1>
|
||||
<script>const WS_TOKEN = "{{WS_TOKEN}}";</script>
|
||||
</body></html>
|
||||
10
src/ntr_fetcher/static/login.html
Normal file
10
src/ntr_fetcher/static/login.html
Normal file
@@ -0,0 +1,10 @@
|
||||
<!DOCTYPE html>
|
||||
<html><head><title>Login</title></head>
|
||||
<body>
|
||||
<!--ERROR-->
|
||||
<form method="post" action="/login">
|
||||
<label>Username <input name="username"></label>
|
||||
<label>Password <input name="password" type="password"></label>
|
||||
<button type="submit">Login</button>
|
||||
</form>
|
||||
</body></html>
|
||||
178
tests/test_dashboard.py
Normal file
178
tests/test_dashboard.py
Normal file
@@ -0,0 +1,178 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from ntr_fetcher.dashboard import create_dashboard_router
|
||||
from ntr_fetcher.db import Database
|
||||
from ntr_fetcher.models import Track
|
||||
from ntr_fetcher.websocket import AnnounceManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db(tmp_path):
|
||||
database = Database(str(tmp_path / "test.db"))
|
||||
database.initialize()
|
||||
return database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def manager():
|
||||
return AnnounceManager()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(db, manager):
|
||||
a = FastAPI()
|
||||
router = create_dashboard_router(
|
||||
db=db,
|
||||
manager=manager,
|
||||
admin_token="test-token",
|
||||
web_user="nick",
|
||||
web_password="secret",
|
||||
secret_key="test-secret-key",
|
||||
show_day=2,
|
||||
show_hour=22,
|
||||
)
|
||||
a.include_router(router)
|
||||
return a
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app):
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
def _seed_show(db):
|
||||
week_start = datetime(2026, 3, 12, 2, 0, 0, tzinfo=timezone.utc)
|
||||
week_end = datetime(2026, 3, 19, 2, 0, 0, tzinfo=timezone.utc)
|
||||
show = db.get_or_create_show(week_start, week_end)
|
||||
t1 = Track(1, "Song A", "Artist A", "https://soundcloud.com/a/1", None, 180000, "cc-by",
|
||||
datetime(2026, 3, 14, 1, 0, 0, tzinfo=timezone.utc), "{}")
|
||||
db.upsert_track(t1)
|
||||
db.set_show_tracks(show.id, [t1.id])
|
||||
return show
|
||||
|
||||
|
||||
# --- Session auth tests ---
|
||||
|
||||
def test_dashboard_redirects_without_session(client):
|
||||
resp = client.get("/dashboard", follow_redirects=False)
|
||||
assert resp.status_code == 303
|
||||
assert "/login" in resp.headers["location"]
|
||||
|
||||
|
||||
def test_login_page_renders(client):
|
||||
resp = client.get("/login")
|
||||
assert resp.status_code == 200
|
||||
assert "login" in resp.text.lower()
|
||||
|
||||
|
||||
def test_login_with_valid_credentials(client):
|
||||
resp = client.post(
|
||||
"/login",
|
||||
data={"username": "nick", "password": "secret"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert resp.status_code == 303
|
||||
assert "/dashboard" in resp.headers["location"]
|
||||
assert "ntr_session" in resp.cookies
|
||||
|
||||
|
||||
def test_login_with_invalid_credentials(client):
|
||||
resp = client.post(
|
||||
"/login",
|
||||
data={"username": "nick", "password": "wrong"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert "invalid" in resp.text.lower() or "incorrect" in resp.text.lower()
|
||||
|
||||
|
||||
def test_dashboard_accessible_with_session(client):
|
||||
client.post(
|
||||
"/login",
|
||||
data={"username": "nick", "password": "secret"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
resp = client.get("/dashboard")
|
||||
assert resp.status_code == 200
|
||||
assert "dashboard" in resp.text.lower()
|
||||
|
||||
|
||||
def test_logout_clears_session(client):
|
||||
client.post(
|
||||
"/login",
|
||||
data={"username": "nick", "password": "secret"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
resp = client.get("/logout", follow_redirects=False)
|
||||
assert resp.status_code == 303
|
||||
assert "/login" in resp.headers["location"]
|
||||
|
||||
resp2 = client.get("/dashboard", follow_redirects=False)
|
||||
assert resp2.status_code == 303
|
||||
|
||||
|
||||
# --- Announce endpoint tests ---
|
||||
|
||||
def test_announce_with_session(client, db):
|
||||
_seed_show(db)
|
||||
client.post(
|
||||
"/login",
|
||||
data={"username": "nick", "password": "secret"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
resp = client.post("/admin/announce", json={"show_id": 1, "position": 1})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "announced"
|
||||
assert "Now Playing:" in data["message"]
|
||||
assert "Song A" in data["message"]
|
||||
assert "Artist A" in data["message"]
|
||||
|
||||
|
||||
def test_announce_with_bearer(client, db):
|
||||
_seed_show(db)
|
||||
resp = client.post(
|
||||
"/admin/announce",
|
||||
json={"show_id": 1, "position": 1},
|
||||
headers={"Authorization": "Bearer test-token"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert "Now Playing:" in resp.json()["message"]
|
||||
|
||||
|
||||
def test_announce_without_auth(client, db):
|
||||
_seed_show(db)
|
||||
resp = client.post("/admin/announce", json={"show_id": 1, "position": 1})
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_announce_invalid_position(client, db):
|
||||
_seed_show(db)
|
||||
resp = client.post(
|
||||
"/admin/announce",
|
||||
json={"show_id": 1, "position": 99},
|
||||
headers={"Authorization": "Bearer test-token"},
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
# --- WebSocket tests ---
|
||||
|
||||
def test_ws_subscribe_with_valid_token(app):
|
||||
with TestClient(app) as c:
|
||||
with c.websocket_connect("/ws/announce") as ws:
|
||||
ws.send_json({"type": "subscribe", "token": "test-token"})
|
||||
data = ws.receive_json()
|
||||
assert data["type"] == "status"
|
||||
|
||||
|
||||
def test_ws_subscribe_with_invalid_token(app):
|
||||
with TestClient(app) as c:
|
||||
with c.websocket_connect("/ws/announce") as ws:
|
||||
ws.send_json({"type": "subscribe", "token": "wrong"})
|
||||
with pytest.raises(Exception):
|
||||
ws.receive_json()
|
||||
Reference in New Issue
Block a user