import asyncio import datetime import html import json import os import threading import time import weakref from typing import Optional, Callable import aiohttp from aiohttp import web import supybot.log as log from .store import SongRequest as SongRequestModel, Session, VALID_STATUSES TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), "templates") STATIC_DIR = os.path.join(os.path.dirname(__file__), "static") def _render_alternates(request_id: int, alternates_json: str, status: str) -> str: """Render alternate tracks as a collapsible details section.""" if not alternates_json: return "" try: alts = json.loads(alternates_json) except (json.JSONDecodeError, TypeError): return "" if not alts: return "" esc = html.escape show_approve = status == "pending" items = [] for idx, alt in enumerate(alts): artwork = esc(alt.get("artwork_url", "").replace("600x600", "100x100")) title = esc(alt.get("title", "")) artist = esc(alt.get("artist", "")) url = esc(alt.get("apple_music_url", "")) approve_btn = "" if show_approve: approve_btn = ( f'' ) items.append( f'
' f'' f'
{title}' f'{artist}
' f'{approve_btn}
' ) return ( '
' f'{len(items)} other match{"es" if len(items) != 1 else ""}' '
' + "".join(items) + '
' ) def render_request_card(req: SongRequestModel) -> str: """Render a single request as an HTML card fragment.""" status_classes = { "pending": "status-pending", "approved": "status-approved", "rejected": "status-rejected", "played": "status-played", } status_cls = status_classes.get(req.status, "") esc = html.escape created = time.strftime("%H:%M:%S", time.localtime(req.created_at)) actions = "" if req.status == "pending": actions = f"""
""" elif req.status == "approved": actions = f"""
""" alternates_html = _render_alternates(req.id, req.alternates_json, req.status) return f"""
{alternates_html}
""" class WebServer: """Standalone aiohttp web server running in a background thread.""" def __init__(self, host: str, port: int, store, get_auth_token: Callable[[], str], on_status_change: Optional[Callable] = None, get_requests_open: Optional[Callable] = None, set_requests_open: Optional[Callable] = None): self._host = host self._port = port self._store = store self._get_auth_token = get_auth_token self._on_status_change = on_status_change self._get_requests_open = get_requests_open or (lambda: True) self._set_requests_open = set_requests_open or (lambda v: None) self._loop: Optional[asyncio.AbstractEventLoop] = None self._thread: Optional[threading.Thread] = None self._runner: Optional[web.AppRunner] = None self._ws_clients: weakref.WeakSet = weakref.WeakSet() self._ws_admin_map: dict = {} def start(self): self._loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self): if self._loop and self._runner: future = asyncio.run_coroutine_threadsafe(self._shutdown(), self._loop) try: future.result(timeout=5) except Exception: pass if self._loop: self._loop.call_soon_threadsafe(self._loop.stop) if self._thread: self._thread.join(timeout=5) def publish(self, event_type: str, data: str): """Thread-safe publish from Limnoria threads into the async WebSocket broadcast.""" if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe( asyncio.ensure_future, self._broadcast(event_type, data), ) def publish_json(self, payload: dict): """Thread-safe publish of a raw JSON dict to all WS clients.""" if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe( asyncio.ensure_future, self._broadcast_raw(json.dumps(payload)), ) def _run(self): asyncio.set_event_loop(self._loop) self._loop.run_until_complete(self._start_app()) self._loop.run_forever() async def _start_app(self): app = web.Application() app.router.add_get("/", self._handle_dashboard) app.router.add_get("/login", self._handle_login_page) app.router.add_get("/ws", self._handle_ws) app.router.add_post("/api/auth/login", self._handle_auth_login) app.router.add_post("/api/auth/logout", self._handle_auth_logout) app.router.add_get("/api/auth/me", self._handle_auth_me) app.router.add_get("/api/channels", self._handle_channels) app.router.add_get("/api/requests", self._handle_api_get) app.router.add_post("/api/requests/{request_id}/approve-alt/{alt_idx}", self._handle_approve_alt) app.router.add_post("/api/requests/{request_id}/{action}", self._handle_api_action) app.router.add_get("/api/export/markdown", self._handle_export_markdown) app.router.add_get("/api/status", self._handle_get_status) app.router.add_post("/api/status", self._handle_post_status) app.router.add_post("/api/history/clear", self._handle_clear_history) app.router.add_get("/api/sessions", self._handle_get_sessions) app.router.add_post("/api/sessions/start", self._handle_start_session) app.router.add_post("/api/sessions/stop", self._handle_stop_session) app.router.add_get("/api/sessions/{session_id}/requests", self._handle_session_requests) app.router.add_patch("/api/sessions/{session_id}", self._handle_rename_session) app.router.add_delete("/api/sessions/{session_id}", self._handle_delete_session) app.router.add_post("/api/sessions/{session_id}/clear", self._handle_clear_session) app.router.add_static("/static", STATIC_DIR) # Catch-all for /{channel} URL routing -- must be last app.router.add_get("/{channel}", self._handle_dashboard) self._runner = web.AppRunner(app) await self._runner.setup() site = web.TCPSite(self._runner, self._host, self._port) await site.start() log.info(f"SongRequest: Web dashboard listening on {self._host}:{self._port}") async def _shutdown(self): for ws in set(self._ws_clients): await ws.close() if self._runner: await self._runner.cleanup() async def _broadcast(self, event_type: str, data: str): msg = json.dumps({"event": event_type, "html": data}) dead = [] for ws in set(self._ws_clients): try: await ws.send_str(msg) except (ConnectionResetError, Exception): dead.append(ws) for ws in dead: self._ws_clients.discard(ws) async def _broadcast_raw(self, raw_json: str): dead = [] for ws in set(self._ws_clients): try: await ws.send_str(raw_json) except (ConnectionResetError, Exception): dead.append(ws) for ws in dead: self._ws_clients.discard(ws) # ------------------------------------------------------------------ # Auth helpers # ------------------------------------------------------------------ def _extract_token(self, request: web.Request) -> str: return ( request.headers.get("X-Auth-Token", "") or request.query.get("token", "") ) def _check_auth(self, request: web.Request) -> Optional[str]: """Validate request auth. Returns admin username or None. Accepts either a valid session token or the legacy webAuthToken. """ token = self._extract_token(request) if not token: return None username = self._store.validate_session(token) if username: return username legacy = self._get_auth_token() if legacy and token == legacy: return "__legacy__" return None def _require_auth(self, request: web.Request) -> Optional[web.Response]: """Return a 403 Response if auth fails, or None if auth passes.""" if self._check_auth(request) is None: return web.Response(text="Forbidden", status=403) return None def _get_online_admins(self) -> list: return sorted(set(self._ws_admin_map.values())) async def _broadcast_presence(self): admins = self._get_online_admins() payload = json.dumps({"event": "admin-presence", "admins": admins}) await self._broadcast_raw(payload) # ------------------------------------------------------------------ # Auth routes # ------------------------------------------------------------------ async def _handle_login_page(self, request: web.Request) -> web.Response: template_path = os.path.join(TEMPLATES_DIR, "login.html") try: with open(template_path, "r", encoding="utf-8") as f: content = f.read() except FileNotFoundError: return web.Response(text="Login template not found", status=500) return web.Response(text=content, content_type="text/html") async def _handle_auth_login(self, request: web.Request) -> web.Response: try: data = await request.json() except Exception: return web.Response(text="Bad request", status=400) username = (data.get("username") or "").strip() key = data.get("key") or "" if not username or not key: return web.json_response({"error": "Username and key are required"}, status=400) admin_id = self._store.validate_admin(username, key) if admin_id is None: return web.json_response({"error": "Invalid credentials"}, status=401) token = self._store.create_session_token(admin_id, username) return web.json_response({"token": token, "username": username}) async def _handle_auth_logout(self, request: web.Request) -> web.Response: token = self._extract_token(request) if token: self._store.delete_session(token) return web.json_response({"ok": True}) async def _handle_auth_me(self, request: web.Request) -> web.Response: token = self._extract_token(request) username = self._store.validate_session(token) if token else None if not username: return web.json_response({"error": "Not authenticated"}, status=401) return web.json_response({"username": username}) # ------------------------------------------------------------------ # Dashboard & WebSocket # ------------------------------------------------------------------ async def _handle_dashboard(self, request: web.Request) -> web.Response: template_path = os.path.join(TEMPLATES_DIR, "index.html") try: with open(template_path, "r", encoding="utf-8") as f: content = f.read() except FileNotFoundError: return web.Response(text="Dashboard template not found", status=500) return web.Response(text=content, content_type="text/html") async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) self._ws_clients.add(ws) token = request.query.get("token", "") admin_username = self._store.validate_session(token) if token else None if admin_username: self._ws_admin_map[ws] = admin_username await self._broadcast_presence() try: async for msg in ws: if msg.type == aiohttp.WSMsgType.ERROR: break finally: self._ws_clients.discard(ws) was_admin = ws in self._ws_admin_map self._ws_admin_map.pop(ws, None) if was_admin: await self._broadcast_presence() return ws # ------------------------------------------------------------------ # Data API # ------------------------------------------------------------------ async def _handle_channels(self, request: web.Request) -> web.Response: channels = self._store.get_channels() return web.json_response(channels) async def _handle_api_get(self, request: web.Request) -> web.Response: status_filter = request.query.get("status") channel_filter = request.query.get("channel") or None if status_filter == "history": reqs = self._store.get_current_session_history(channel=channel_filter) elif status_filter and status_filter in VALID_STATUSES: reqs = self._store.get_by_status_and_channel(channel_filter, status_filter) else: reqs = self._store.get_by_status_and_channel(channel_filter, "pending", "approved") cards = "\n".join(render_request_card(r) for r in reqs) return web.Response(text=cards, content_type="text/html") async def _handle_api_action(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied try: request_id = int(request.match_info["request_id"]) except (ValueError, KeyError): return web.Response(text="Bad request", status=400) action = request.match_info.get("action", "") status_map = {"approve": "approved", "reject": "rejected", "played": "played"} new_status = status_map.get(action) if not new_status: return web.Response(text="Invalid action", status=400) req = self._store.update_status(request_id, new_status) if not req: return web.Response(text="Request not found", status=404) card_html = render_request_card(req) await self._broadcast("request-update", card_html) if self._on_status_change: try: self._on_status_change(req) except Exception: log.exception("SongRequest: on_status_change callback failed") return web.Response(text=card_html, content_type="text/html") async def _handle_approve_alt(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied try: request_id = int(request.match_info["request_id"]) alt_idx = int(request.match_info["alt_idx"]) except (ValueError, KeyError): return web.Response(text="Bad request", status=400) req = self._store.get(request_id) if not req: return web.Response(text="Request not found", status=404) try: alts = json.loads(req.alternates_json) if req.alternates_json else [] except (json.JSONDecodeError, TypeError): alts = [] if alt_idx < 0 or alt_idx >= len(alts): return web.Response(text="Invalid alternate index", status=400) alt = alts[alt_idx] self._store.swap_alternate(request_id, alt) req = self._store.update_status(request_id, "approved") if not req: return web.Response(text="Request not found", status=404) card_html = render_request_card(req) await self._broadcast("request-update", card_html) if self._on_status_change: try: self._on_status_change(req) except Exception: log.exception("SongRequest: on_status_change callback failed") return web.Response(text=card_html, content_type="text/html") async def _handle_export_markdown(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied channel_filter = request.query.get("channel") or None reqs = self._store.get_history(limit=5000, channel=channel_filter) today = datetime.date.today().isoformat() suffix = f" - {channel_filter}" if channel_filter else "" lines = [f"# Song Requests Export ({today}{suffix})", "", "| Title | Artist | Album | Requested By | Status | Time | Apple Music |", "| --- | --- | --- | --- | --- | --- | --- |"] for r in reqs: ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(r.created_at)) lines.append(f"| {r.title} | {r.artist} | {r.album} | {r.requester_nick} | {r.status} | {ts} | [link]({r.apple_music_url}) |") body = "\n".join(lines) + "\n" return web.Response( body=body, content_type="text/markdown", headers={"Content-Disposition": f'attachment; filename="song-requests-{today}.md"'}, ) async def _handle_get_status(self, request: web.Request) -> web.Response: is_open = self._get_requests_open() return web.json_response({"open": is_open}) async def _handle_post_status(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied try: data = await request.json() except Exception: return web.Response(text="Bad request", status=400) is_open = bool(data.get("open", True)) self._set_requests_open(is_open) payload = json.dumps({"event": "requests-status", "open": is_open}) dead = [] for ws_client in set(self._ws_clients): try: await ws_client.send_str(payload) except Exception: dead.append(ws_client) for ws_client in dead: self._ws_clients.discard(ws_client) return web.json_response({"open": is_open}) async def _handle_clear_history(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied count = self._store.clear_history() payload = json.dumps({"event": "history-cleared"}) dead = [] for ws_client in set(self._ws_clients): try: await ws_client.send_str(payload) except Exception: dead.append(ws_client) for ws_client in dead: self._ws_clients.discard(ws_client) return web.json_response({"cleared": count}) # ------------------------------------------------------------------ # Sessions API # ------------------------------------------------------------------ async def _handle_get_sessions(self, request: web.Request) -> web.Response: active = self._store.get_active_session() archived = self._store.get_archived_sessions() result = { "active": active.to_dict() if active else None, "archived": [], } for s in archived: d = s.to_dict() d["played_count"] = self._store.get_session_played_count(s.id) result["archived"].append(d) return web.json_response(result) async def _handle_start_session(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied existing = self._store.get_active_session() if existing: return web.json_response({"error": "A session is already active"}, status=409) try: data = await request.json() except Exception: data = {} name = data.get("name", "").strip() if not name: name = time.strftime("%Y-%m-%d %H:%M") session = self._store.start_session(name) payload = json.dumps({"event": "session-started", "session": session.to_dict()}) await self._broadcast_raw(payload) return web.json_response(session.to_dict()) async def _handle_stop_session(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied active = self._store.get_active_session() if not active: return web.json_response({"error": "No active session"}, status=404) try: data = await request.json() except Exception: data = {} clear_remaining = bool(data.get("clear_remaining", False)) session = self._store.stop_session(active.id) if clear_remaining: self._store.clear_session_non_played(active.id) payload = json.dumps({"event": "session-stopped", "session": session.to_dict() if session else None}) await self._broadcast_raw(payload) return web.json_response(session.to_dict() if session else {}) async def _handle_session_requests(self, request: web.Request) -> web.Response: try: session_id = int(request.match_info["session_id"]) except (ValueError, KeyError): return web.Response(text="Bad request", status=400) channel_filter = request.query.get("channel") or None reqs = self._store.get_session_history(session_id, channel=channel_filter) cards = "\n".join(render_request_card(r) for r in reqs) return web.Response(text=cards, content_type="text/html") async def _handle_rename_session(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied try: session_id = int(request.match_info["session_id"]) except (ValueError, KeyError): return web.Response(text="Bad request", status=400) try: data = await request.json() except Exception: return web.Response(text="Bad request", status=400) name = data.get("name", "").strip() if not name: return web.json_response({"error": "Name cannot be empty"}, status=400) session = self._store.rename_session(session_id, name) if not session: return web.json_response({"error": "Session not found"}, status=404) return web.json_response(session.to_dict()) async def _handle_clear_session(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied try: session_id = int(request.match_info["session_id"]) except (ValueError, KeyError): return web.Response(text="Bad request", status=400) count = self._store.clear_session_requests(session_id) return web.json_response({"cleared": count}) async def _handle_delete_session(self, request: web.Request) -> web.Response: denied = self._require_auth(request) if denied: return denied try: session_id = int(request.match_info["session_id"]) except (ValueError, KeyError): return web.Response(text="Bad request", status=400) deleted = self._store.delete_session(session_id) if not deleted: return web.json_response({"error": "Session not found"}, status=404) return web.json_response({"deleted": True})