Files
SongRequest/SongRequest/web.py
cottongin 93f55840a5 Add explicit filtering, fuzzy matching, reply modes, bulk actions, and refresh README
- Explicit/clean track filtering with configurable explicitMode per channel
- Last.fm spell correction and smart dual-strategy iTunes search
- Configurable queuedReplyMode and announceReplyMode (channel/private/notice)
- Per-channel announceApproved/announceRejected/announceNowPlaying toggles
- Bulk select mode for mass approve/reject/mark-played in web dashboard
- Comprehensive README rewrite covering all current features and config

Made-with: Cursor
2026-04-03 22:34:42 -04:00

732 lines
29 KiB
Python

import asyncio
import datetime
import html
import json
import os
import threading
import time
import weakref
from typing import Optional, Callable
import aiohttp
from aiohttp import web
import supybot.log as log
from .store import SongRequest as SongRequestModel, Session, VALID_STATUSES
TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), "templates")
STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
def _render_alternates(request_id: int, alternates_json: str, status: str) -> str:
"""Render alternate tracks as a collapsible details section."""
if not alternates_json:
return ""
try:
alts = json.loads(alternates_json)
except (json.JSONDecodeError, TypeError):
return ""
if not alts:
return ""
esc = html.escape
show_approve = status == "pending"
show_play = status in ("pending", "approved")
items = []
for idx, alt in enumerate(alts):
artwork = esc(alt.get("artwork_url", "").replace("600x600", "100x100"))
title = esc(alt.get("title", ""))
artist = esc(alt.get("artist", ""))
url = esc(alt.get("apple_music_url", ""))
buttons = []
if show_approve:
buttons.append(
f'<button hx-post="/api/requests/{request_id}/approve-alt/{idx}"'
f' hx-swap="outerHTML" hx-target="#request-{request_id}"'
f' class="btn btn-approve btn-sm admin-only"'
f' onclick="event.stopPropagation();">Approve</button>'
)
if show_play:
buttons.append(
f'<button hx-post="/api/requests/{request_id}/play-alt/{idx}"'
f' hx-swap="outerHTML" hx-target="#request-{request_id}"'
f' class="btn btn-played btn-sm admin-only"'
f' onclick="event.stopPropagation();">Mark Played</button>'
)
actions_html = f'<div class="alt-actions admin-only">{"".join(buttons)}</div>' if buttons else ""
items.append(
f'<div class="alt-card" onclick="event.stopPropagation(); window.open(\'{url}\',\'_blank\')">'
f'<img class="alt-art" src="{artwork}" alt="" loading="lazy" />'
f'<div class="alt-info"><span class="alt-title">{title}</span>'
f'<span class="alt-artist">{artist}</span></div>'
f'{actions_html}</div>'
)
return (
'<details class="alternates" onclick="event.stopPropagation();">'
f'<summary>{len(items)} other match{"es" if len(items) != 1 else ""}</summary>'
'<div class="alt-list">' + "".join(items) + '</div></details>'
)
def render_request_card(req: SongRequestModel) -> str:
"""Render a single request as an HTML card fragment."""
status_classes = {
"pending": "status-pending",
"approved": "status-approved",
"rejected": "status-rejected",
"played": "status-played",
}
status_cls = status_classes.get(req.status, "")
esc = html.escape
created = time.strftime("%H:%M:%S", time.localtime(req.created_at))
actions = ""
if req.status == "pending":
actions = f"""
<div class="card-actions admin-only">
<button hx-post="/api/requests/{req.id}/approve"
hx-swap="outerHTML" hx-target="#request-{req.id}"
class="btn btn-approve" onclick="event.stopPropagation()">Approve</button>
<button hx-post="/api/requests/{req.id}/reject"
hx-swap="outerHTML" hx-target="#request-{req.id}"
class="btn btn-reject" onclick="event.stopPropagation()">Reject</button>
</div>"""
elif req.status == "approved":
actions = f"""
<div class="card-actions admin-only">
<button hx-post="/api/requests/{req.id}/played"
hx-swap="outerHTML" hx-target="#request-{req.id}"
class="btn btn-played" onclick="event.stopPropagation()">Mark Played</button>
</div>"""
alternates_html = _render_alternates(req.id, req.alternates_json, req.status)
return f"""<div id="request-{req.id}" class="card-wrapper" data-status="{esc(req.status)}" data-channel="{esc(req.channel)}">
<div class="card-link request-card {esc(status_cls)}"
onclick="window.open('{esc(req.apple_music_url)}','_blank')">
<input type="checkbox" class="select-checkbox" onclick="event.stopPropagation();" />
<img class="album-art" src="{esc(req.artwork_url)}" alt="Album art" loading="lazy" />
<div class="card-body">
<div class="card-title">{esc(req.title)}</div>
<div class="card-artist">{esc(req.artist)}</div>
<div class="card-album">{esc(req.album)}</div>
<div class="card-meta">Requested by {esc(req.requester_nick)} in {esc(req.channel)} at {created}</div>
<span class="card-status-badge">{esc(req.status)}</span>
</div>
{actions}
</div>
{alternates_html}
</div>"""
class WebServer:
"""Standalone aiohttp web server running in a background thread."""
def __init__(self, host: str, port: int, store, get_auth_token: Callable[[], str],
on_status_change: Optional[Callable] = None,
get_requests_open: Optional[Callable] = None,
set_requests_open: Optional[Callable] = None):
self._host = host
self._port = port
self._store = store
self._get_auth_token = get_auth_token
self._on_status_change = on_status_change
self._get_requests_open = get_requests_open or (lambda: True)
self._set_requests_open = set_requests_open or (lambda v: None)
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
self._runner: Optional[web.AppRunner] = None
self._ws_clients: weakref.WeakSet = weakref.WeakSet()
self._ws_admin_map: dict = {}
def start(self):
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
if self._loop and self._runner:
future = asyncio.run_coroutine_threadsafe(self._shutdown(), self._loop)
try:
future.result(timeout=5)
except Exception:
pass
if self._loop:
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread:
self._thread.join(timeout=5)
def publish(self, event_type: str, data: str):
"""Thread-safe publish from Limnoria threads into the async WebSocket broadcast."""
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(
asyncio.ensure_future,
self._broadcast(event_type, data),
)
def publish_json(self, payload: dict):
"""Thread-safe publish of a raw JSON dict to all WS clients."""
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(
asyncio.ensure_future,
self._broadcast_raw(json.dumps(payload)),
)
def _run(self):
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._start_app())
self._loop.run_forever()
async def _start_app(self):
app = web.Application()
app.router.add_get("/", self._handle_dashboard)
app.router.add_get("/login", self._handle_login_page)
app.router.add_get("/ws", self._handle_ws)
app.router.add_post("/api/auth/login", self._handle_auth_login)
app.router.add_post("/api/auth/logout", self._handle_auth_logout)
app.router.add_get("/api/auth/me", self._handle_auth_me)
app.router.add_get("/api/channels", self._handle_channels)
app.router.add_get("/api/requests", self._handle_api_get)
app.router.add_post("/api/requests/bulk", self._handle_bulk_action)
app.router.add_post("/api/requests/{request_id}/approve-alt/{alt_idx}", self._handle_approve_alt)
app.router.add_post("/api/requests/{request_id}/play-alt/{alt_idx}", self._handle_play_alt)
app.router.add_post("/api/requests/{request_id}/{action}", self._handle_api_action)
app.router.add_get("/api/export/markdown", self._handle_export_markdown)
app.router.add_get("/api/status", self._handle_get_status)
app.router.add_post("/api/status", self._handle_post_status)
app.router.add_post("/api/history/clear", self._handle_clear_history)
app.router.add_get("/api/sessions", self._handle_get_sessions)
app.router.add_post("/api/sessions/start", self._handle_start_session)
app.router.add_post("/api/sessions/stop", self._handle_stop_session)
app.router.add_get("/api/sessions/{session_id}/requests", self._handle_session_requests)
app.router.add_patch("/api/sessions/{session_id}", self._handle_rename_session)
app.router.add_delete("/api/sessions/{session_id}", self._handle_delete_session)
app.router.add_post("/api/sessions/{session_id}/clear", self._handle_clear_session)
app.router.add_static("/static", STATIC_DIR)
# Catch-all for /{channel} URL routing -- must be last
app.router.add_get("/{channel}", self._handle_dashboard)
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._host, self._port)
await site.start()
log.info(f"SongRequest: Web dashboard listening on {self._host}:{self._port}")
async def _shutdown(self):
for ws in set(self._ws_clients):
await ws.close()
if self._runner:
await self._runner.cleanup()
async def _broadcast(self, event_type: str, data: str):
msg = json.dumps({"event": event_type, "html": data})
dead = []
for ws in set(self._ws_clients):
try:
await ws.send_str(msg)
except (ConnectionResetError, Exception):
dead.append(ws)
for ws in dead:
self._ws_clients.discard(ws)
async def _broadcast_raw(self, raw_json: str):
dead = []
for ws in set(self._ws_clients):
try:
await ws.send_str(raw_json)
except (ConnectionResetError, Exception):
dead.append(ws)
for ws in dead:
self._ws_clients.discard(ws)
# ------------------------------------------------------------------
# Auth helpers
# ------------------------------------------------------------------
def _extract_token(self, request: web.Request) -> str:
return (
request.headers.get("X-Auth-Token", "")
or request.query.get("token", "")
)
def _check_auth(self, request: web.Request) -> Optional[str]:
"""Validate request auth. Returns admin username or None.
Accepts either a valid session token or the legacy webAuthToken.
"""
token = self._extract_token(request)
if not token:
return None
username = self._store.validate_session(token)
if username:
return username
legacy = self._get_auth_token()
if legacy and token == legacy:
return "__legacy__"
return None
def _require_auth(self, request: web.Request) -> Optional[web.Response]:
"""Return a 403 Response if auth fails, or None if auth passes."""
if self._check_auth(request) is None:
return web.Response(text="Forbidden", status=403)
return None
def _get_online_admins(self) -> list:
return sorted(set(self._ws_admin_map.values()))
async def _broadcast_presence(self):
admins = self._get_online_admins()
payload = json.dumps({"event": "admin-presence", "admins": admins})
await self._broadcast_raw(payload)
# ------------------------------------------------------------------
# Auth routes
# ------------------------------------------------------------------
async def _handle_login_page(self, request: web.Request) -> web.Response:
template_path = os.path.join(TEMPLATES_DIR, "login.html")
try:
with open(template_path, "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
return web.Response(text="Login template not found", status=500)
return web.Response(text=content, content_type="text/html")
async def _handle_auth_login(self, request: web.Request) -> web.Response:
try:
data = await request.json()
except Exception:
return web.Response(text="Bad request", status=400)
username = (data.get("username") or "").strip()
key = data.get("key") or ""
if not username or not key:
return web.json_response({"error": "Username and key are required"}, status=400)
admin_id = self._store.validate_admin(username, key)
if admin_id is None:
return web.json_response({"error": "Invalid credentials"}, status=401)
token = self._store.create_session_token(admin_id, username)
return web.json_response({"token": token, "username": username})
async def _handle_auth_logout(self, request: web.Request) -> web.Response:
token = self._extract_token(request)
if token:
self._store.delete_session(token)
return web.json_response({"ok": True})
async def _handle_auth_me(self, request: web.Request) -> web.Response:
token = self._extract_token(request)
username = self._store.validate_session(token) if token else None
if not username:
return web.json_response({"error": "Not authenticated"}, status=401)
return web.json_response({"username": username})
# ------------------------------------------------------------------
# Dashboard & WebSocket
# ------------------------------------------------------------------
async def _handle_dashboard(self, request: web.Request) -> web.Response:
template_path = os.path.join(TEMPLATES_DIR, "index.html")
try:
with open(template_path, "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
return web.Response(text="Dashboard template not found", status=500)
return web.Response(text=content, content_type="text/html")
async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
self._ws_clients.add(ws)
token = request.query.get("token", "")
admin_username = self._store.validate_session(token) if token else None
if admin_username:
self._ws_admin_map[ws] = admin_username
await self._broadcast_presence()
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.ERROR:
break
finally:
self._ws_clients.discard(ws)
was_admin = ws in self._ws_admin_map
self._ws_admin_map.pop(ws, None)
if was_admin:
await self._broadcast_presence()
return ws
# ------------------------------------------------------------------
# Data API
# ------------------------------------------------------------------
async def _handle_channels(self, request: web.Request) -> web.Response:
channels = self._store.get_channels()
return web.json_response(channels)
async def _handle_api_get(self, request: web.Request) -> web.Response:
status_filter = request.query.get("status")
channel_filter = request.query.get("channel") or None
if status_filter == "history":
reqs = self._store.get_current_session_history(channel=channel_filter)
elif status_filter and status_filter in VALID_STATUSES:
reqs = self._store.get_by_status_and_channel(channel_filter, status_filter)
else:
reqs = self._store.get_by_status_and_channel(channel_filter, "pending", "approved")
cards = "\n".join(render_request_card(r) for r in reqs)
return web.Response(text=cards, content_type="text/html")
async def _handle_bulk_action(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
data = await request.json()
except Exception:
return web.Response(text="Bad request", status=400)
ids = data.get("ids", [])
action = data.get("action", "")
status_map = {"approve": "approved", "reject": "rejected", "played": "played"}
new_status = status_map.get(action)
if not new_status or not isinstance(ids, list) or not ids:
return web.json_response({"error": "Invalid action or empty ids"}, status=400)
processed = 0
for rid in ids:
try:
rid = int(rid)
except (ValueError, TypeError):
continue
req = self._store.update_status(rid, new_status)
if not req:
continue
processed += 1
card_html = render_request_card(req)
await self._broadcast("request-update", card_html)
if self._on_status_change:
try:
self._on_status_change(req)
except Exception:
log.exception("SongRequest: on_status_change callback failed")
return web.json_response({"processed": processed})
async def _handle_api_action(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
request_id = int(request.match_info["request_id"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
action = request.match_info.get("action", "")
status_map = {"approve": "approved", "reject": "rejected", "played": "played"}
new_status = status_map.get(action)
if not new_status:
return web.Response(text="Invalid action", status=400)
req = self._store.update_status(request_id, new_status)
if not req:
return web.Response(text="Request not found", status=404)
card_html = render_request_card(req)
await self._broadcast("request-update", card_html)
if self._on_status_change:
try:
self._on_status_change(req)
except Exception:
log.exception("SongRequest: on_status_change callback failed")
return web.Response(text=card_html, content_type="text/html")
async def _handle_approve_alt(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
request_id = int(request.match_info["request_id"])
alt_idx = int(request.match_info["alt_idx"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
req = self._store.get(request_id)
if not req:
return web.Response(text="Request not found", status=404)
try:
alts = json.loads(req.alternates_json) if req.alternates_json else []
except (json.JSONDecodeError, TypeError):
alts = []
if alt_idx < 0 or alt_idx >= len(alts):
return web.Response(text="Invalid alternate index", status=400)
alt = alts[alt_idx]
self._store.swap_alternate(request_id, alt)
req = self._store.update_status(request_id, "approved")
if not req:
return web.Response(text="Request not found", status=404)
card_html = render_request_card(req)
await self._broadcast("request-update", card_html)
if self._on_status_change:
try:
self._on_status_change(req)
except Exception:
log.exception("SongRequest: on_status_change callback failed")
return web.Response(text=card_html, content_type="text/html")
async def _handle_play_alt(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
request_id = int(request.match_info["request_id"])
alt_idx = int(request.match_info["alt_idx"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
req = self._store.get(request_id)
if not req:
return web.Response(text="Request not found", status=404)
try:
alts = json.loads(req.alternates_json) if req.alternates_json else []
except (json.JSONDecodeError, TypeError):
alts = []
if alt_idx < 0 or alt_idx >= len(alts):
return web.Response(text="Invalid alternate index", status=400)
alt = alts[alt_idx]
self._store.swap_alternate(request_id, alt)
req = self._store.update_status(request_id, "played")
if not req:
return web.Response(text="Request not found", status=404)
card_html = render_request_card(req)
await self._broadcast("request-update", card_html)
if self._on_status_change:
try:
self._on_status_change(req)
except Exception:
log.exception("SongRequest: on_status_change callback failed")
return web.Response(text=card_html, content_type="text/html")
async def _handle_export_markdown(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
channel_filter = request.query.get("channel") or None
reqs = self._store.get_history(limit=5000, channel=channel_filter)
today = datetime.date.today().isoformat()
suffix = f" - {channel_filter}" if channel_filter else ""
lines = [f"# Song Requests Export ({today}{suffix})", "", "| Title | Artist | Album | Requested By | Status | Time | Apple Music |",
"| --- | --- | --- | --- | --- | --- | --- |"]
for r in reqs:
ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(r.created_at))
lines.append(f"| {r.title} | {r.artist} | {r.album} | {r.requester_nick} | {r.status} | {ts} | [link]({r.apple_music_url}) |")
body = "\n".join(lines) + "\n"
return web.Response(
body=body,
content_type="text/markdown",
headers={"Content-Disposition": f'attachment; filename="song-requests-{today}.md"'},
)
async def _handle_get_status(self, request: web.Request) -> web.Response:
is_open = self._get_requests_open()
return web.json_response({"open": is_open})
async def _handle_post_status(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
data = await request.json()
except Exception:
return web.Response(text="Bad request", status=400)
is_open = bool(data.get("open", True))
self._set_requests_open(is_open)
payload = json.dumps({"event": "requests-status", "open": is_open})
dead = []
for ws_client in set(self._ws_clients):
try:
await ws_client.send_str(payload)
except Exception:
dead.append(ws_client)
for ws_client in dead:
self._ws_clients.discard(ws_client)
return web.json_response({"open": is_open})
async def _handle_clear_history(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
count = self._store.clear_history()
payload = json.dumps({"event": "history-cleared"})
dead = []
for ws_client in set(self._ws_clients):
try:
await ws_client.send_str(payload)
except Exception:
dead.append(ws_client)
for ws_client in dead:
self._ws_clients.discard(ws_client)
return web.json_response({"cleared": count})
# ------------------------------------------------------------------
# Sessions API
# ------------------------------------------------------------------
async def _handle_get_sessions(self, request: web.Request) -> web.Response:
active = self._store.get_active_session()
archived = self._store.get_archived_sessions()
result = {
"active": active.to_dict() if active else None,
"archived": [],
}
for s in archived:
d = s.to_dict()
d["played_count"] = self._store.get_session_played_count(s.id)
result["archived"].append(d)
return web.json_response(result)
async def _handle_start_session(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
existing = self._store.get_active_session()
if existing:
return web.json_response({"error": "A session is already active"}, status=409)
try:
data = await request.json()
except Exception:
data = {}
name = data.get("name", "").strip()
if not name:
name = time.strftime("%Y-%m-%d %H:%M")
session = self._store.start_session(name)
payload = json.dumps({"event": "session-started", "session": session.to_dict()})
await self._broadcast_raw(payload)
return web.json_response(session.to_dict())
async def _handle_stop_session(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
active = self._store.get_active_session()
if not active:
return web.json_response({"error": "No active session"}, status=404)
try:
data = await request.json()
except Exception:
data = {}
clear_remaining = bool(data.get("clear_remaining", False))
session = self._store.stop_session(active.id)
if clear_remaining:
self._store.clear_session_non_played(active.id)
payload = json.dumps({"event": "session-stopped", "session": session.to_dict() if session else None})
await self._broadcast_raw(payload)
return web.json_response(session.to_dict() if session else {})
async def _handle_session_requests(self, request: web.Request) -> web.Response:
try:
session_id = int(request.match_info["session_id"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
channel_filter = request.query.get("channel") or None
reqs = self._store.get_session_history(session_id, channel=channel_filter)
cards = "\n".join(render_request_card(r) for r in reqs)
return web.Response(text=cards, content_type="text/html")
async def _handle_rename_session(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
session_id = int(request.match_info["session_id"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
try:
data = await request.json()
except Exception:
return web.Response(text="Bad request", status=400)
name = data.get("name", "").strip()
if not name:
return web.json_response({"error": "Name cannot be empty"}, status=400)
session = self._store.rename_session(session_id, name)
if not session:
return web.json_response({"error": "Session not found"}, status=404)
return web.json_response(session.to_dict())
async def _handle_clear_session(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
session_id = int(request.match_info["session_id"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
count = self._store.clear_session_requests(session_id)
return web.json_response({"cleared": count})
async def _handle_delete_session(self, request: web.Request) -> web.Response:
denied = self._require_auth(request)
if denied:
return denied
try:
session_id = int(request.match_info["session_id"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
deleted = self._store.delete_session(session_id)
if not deleted:
return web.json_response({"error": "Session not found"}, status=404)
return web.json_response({"deleted": True})