""" NtR Playlist — Limnoria plugin for NtR SoundCloud Fetcher API. """ from __future__ import annotations import json import logging import threading import re import urllib.error import urllib.request from datetime import datetime from zoneinfo import ZoneInfo from supybot import callbacks from supybot.commands import optional, wrap LOGGER = logging.getLogger(__name__) # --- API helpers ------------------------------------------------------------- class ApiError(Exception): def __init__(self, status_code: int, detail: str): self.status_code = status_code self.detail = detail super().__init__(f"{status_code}: {detail}") def _api_get(base_url: str, path: str) -> dict: url = f"{base_url.rstrip('/')}{path}" req = urllib.request.Request(url, headers={"Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=10) as resp: raw = resp.read().decode() try: return json.loads(raw) except json.JSONDecodeError as exc: raise ApiError(resp.status, "Invalid API response") from exc except urllib.error.HTTPError as e: try: detail = json.loads(e.read().decode()).get("detail", str(e)) except (json.JSONDecodeError, ValueError): detail = str(e) raise ApiError(e.code, detail) from e except urllib.error.URLError as e: raise ApiError(0, "Cannot reach API") from e def _api_post(base_url: str, path: str, token: str, body: dict | None = None) -> dict: url = f"{base_url.rstrip('/')}{path}" encoded = None headers = {"Accept": "application/json", "Authorization": f"Bearer {token}"} if body is not None: encoded = json.dumps(body).encode() headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=encoded, headers=headers, method="POST") try: with urllib.request.urlopen(req, timeout=10) as resp: raw = resp.read().decode() try: return json.loads(raw) except json.JSONDecodeError as exc: raise ApiError(resp.status, "Invalid API response") from exc except urllib.error.HTTPError as e: try: detail = json.loads(e.read().decode()).get("detail", str(e)) except (json.JSONDecodeError, ValueError): detail = str(e) raise ApiError(e.code, detail) from e except urllib.error.URLError as e: raise ApiError(0, "Cannot reach API") from e # --- Formatting -------------------------------------------------------------- def format_dt(iso_string: str | None, tz_name: str = "America/New_York") -> str: if not iso_string: return "never" dt = datetime.fromisoformat(iso_string) local = dt.astimezone(ZoneInfo(tz_name)) return local.strftime("%a %b %-d, %-I:%M %p %Z") _MAX_IRC_LINE = 430 def format_track(track: dict) -> str: pos = track.get("position", 0) title = track.get("title", "") artist = track.get("artist", "") url = track.get("permalink_url", "") return f"Song #{pos}: {title} by {artist} - {url}" def format_playlist(data: dict) -> str: episode = data.get("episode_number", "?") tracks = data.get("tracks", []) count = len(tracks) prefix = f"Episode {episode} ({count} tracks): " parts: list[str] = [] length = len(prefix) for t in tracks: entry = f"{t.get('title', '')} by {t.get('artist', '')}" sep = ", " if parts else "" if length + len(sep) + len(entry) + 5 > _MAX_IRC_LINE: # +5 for ", ..." parts.append("...") break parts.append(entry) length += len(sep) + len(entry) return prefix + ", ".join(parts) # --- Plugin ------------------------------------------------------------------ _NUMBER_RE = re.compile(r"^!(\d+)$") class NtrPlaylist(callbacks.Plugin): """Query the NtR SoundCloud Fetcher API from IRC.""" def __init__(self, irc): super().__init__(irc) self._irc = irc self._ws_stop = threading.Event() self._ws_thread = threading.Thread(target=self._ws_listener, daemon=True) self._ws_thread.start() def die(self): self._ws_stop.set() super().die() def _ws_listener(self): import websocket from supybot import ircmsgs backoff = 5 max_backoff = 60 LOGGER.info("WS listener thread started") while not self._ws_stop.is_set(): ws_url = self.registryValue("wsUrl") if ws_url.startswith("https://"): ws_url = "wss://" + ws_url[8:] elif ws_url.startswith("http://"): ws_url = "ws://" + ws_url[7:] token = self.registryValue("adminToken") channel = self.registryValue("announceChannel") client_id = self.registryValue("clientId") or "limnoria" if not ws_url or not token: LOGGER.warning("wsUrl or adminToken not configured, WS listener sleeping") self._ws_stop.wait(30) continue LOGGER.info("Connecting to %s as client_id=%s", ws_url, client_id) ws = None try: ws = websocket.WebSocket() ws.connect(ws_url, timeout=10) LOGGER.info("WebSocket TCP connection established") subscribe_msg = { "type": "subscribe", "token": token, "role": "bot", "client_id": client_id, } ws.send(json.dumps(subscribe_msg)) LOGGER.info("Sent subscribe message (role=bot, client_id=%s)", client_id) backoff = 5 while not self._ws_stop.is_set(): ws.settimeout(5) try: raw = ws.recv() if not raw: LOGGER.warning("Received empty message, connection closing") break data = json.loads(raw) LOGGER.debug("Received WS message: type=%s", data.get("type")) if data.get("type") == "announce" and "message" in data: msg = ircmsgs.privmsg(channel, data["message"]) self._irc.queueMsg(msg) LOGGER.info("Announced to %s: %s", channel, data["message"]) elif data.get("type") == "status": LOGGER.info( "Status update: %d bot(s) connected, clients=%s", data.get("subscribers", 0), [c.get("client_id") for c in data.get("clients", [])], ) except websocket.WebSocketTimeoutException: continue except websocket.WebSocketConnectionClosedException: LOGGER.warning("WebSocket connection closed by server") break except ConnectionRefusedError: LOGGER.warning("Connection refused at %s", ws_url) except TimeoutError: LOGGER.warning("Connection timed out to %s", ws_url) except Exception: LOGGER.exception("WS listener error") finally: if ws: try: ws.close() LOGGER.debug("WebSocket closed cleanly") except Exception: pass if not self._ws_stop.is_set(): LOGGER.info("Reconnecting in %ds", backoff) self._ws_stop.wait(backoff) backoff = min(backoff * 2, max_backoff) def _is_admin(self, nick: str) -> bool: admin_nicks = self.registryValue("adminNicks") if not admin_nicks: return False return nick.lower() in [n.lower() for n in admin_nicks] def doPrivmsg(self, irc, msg): channel = msg.args[0] if msg.args else None if not channel or not irc.isChannel(channel): super().doPrivmsg(irc, msg) return text = msg.args[1] if len(msg.args) > 1 else "" match = _NUMBER_RE.match(text) if match: position = match.group(1) base_url = self.registryValue("apiBaseUrl") try: data = _api_get(base_url, f"/playlist/{position}") irc.reply(format_track(data)) except ApiError as exc: LOGGER.warning("API error for !%s: %s", position, exc) irc.reply(exc.detail) super().doPrivmsg(irc, msg) @wrap([optional("text")]) def song(self, irc, msg, args, text): """ Returns a track from a specific episode's playlist. """ if not text: irc.reply("Usage: !song ") return parts = text.strip().split() if len(parts) < 2: irc.reply("Usage: !song ") return try: episode, position = int(parts[0]), int(parts[1]) except ValueError: irc.reply("Usage: !song ") return base_url = self.registryValue("apiBaseUrl") try: data = _api_get(base_url, f"/shows/by-episode/{episode}") except ApiError as exc: LOGGER.warning("API error for !song %s %s: %s", episode, position, exc) irc.reply(exc.detail) return tracks = data.get("tracks", []) track = next((t for t in tracks if t.get("position") == position), None) if not track: irc.reply(f"No track at position {position} in episode {episode}") return irc.reply(format_track(track)) @wrap([optional("text")]) def playlist(self, irc, msg, args, text): """[] Returns the playlist for the current show, or a specific episode. """ base_url = self.registryValue("apiBaseUrl") if text and text.strip(): try: episode = int(text.strip()) except ValueError: irc.reply("Usage: !playlist [episode]") return try: data = _api_get(base_url, f"/shows/by-episode/{episode}") except ApiError as exc: LOGGER.warning("API error for playlist: %s", exc) irc.reply(exc.detail) return else: try: data = _api_get(base_url, "/playlist") except ApiError as exc: LOGGER.warning("API error for playlist: %s", exc) irc.reply(exc.detail) return irc.reply(format_playlist(data)) @wrap([optional("text")]) def lastshow(self, irc, msg, args, text): """ Returns a track from last week's show by position number. """ if not text or not text.strip(): irc.reply("Usage: !lastshow ") return try: position = int(text.strip()) except ValueError: irc.reply("Usage: !lastshow ") return base_url = self.registryValue("apiBaseUrl") try: shows = _api_get(base_url, "/shows?limit=2") except ApiError as exc: LOGGER.warning("API error for lastshow: %s", exc) irc.reply(exc.detail) return if len(shows) < 2: irc.reply("No previous show found") return prev_show_id = shows[1]["id"] try: data = _api_get(base_url, f"/shows/{prev_show_id}") except ApiError as exc: LOGGER.warning("API error for lastshow show %s: %s", prev_show_id, exc) irc.reply(exc.detail) return tracks = data.get("tracks", []) track = next((t for t in tracks if t.get("position") == position), None) if not track: episode = data.get("episode_number", "?") irc.reply(f"No track at position {position} in episode {episode}") return irc.reply(format_track(track)) @wrap def status(self, irc, msg, args): """takes no arguments Returns the current API status. """ base_url = self.registryValue("apiBaseUrl") try: data = _api_get(base_url, "/health") except ApiError as exc: LOGGER.warning("API error for status: %s", exc) irc.reply(exc.detail) return tz = self.registryValue("displayTimezone") status = data.get("status", "unknown") poller = "alive" if data.get("poller_alive") else "dead" last_fetch = format_dt(data.get("last_fetch"), tz) count = data.get("current_week_track_count", 0) irc.reply( f"Status: {status.upper()} | Poller: {poller} | Last fetch: {last_fetch} | Tracks this week: {count}" ) @wrap def refresh(self, irc, msg, args): """takes no arguments Triggers a manual playlist refresh. Admin only. """ if not self._is_admin(msg.nick): irc.reply("Access denied") return token = self.registryValue("adminToken") if not token: irc.reply("Admin token not configured") return base_url = self.registryValue("apiBaseUrl") try: data = _api_post(base_url, "/admin/refresh", token) except ApiError as exc: LOGGER.warning("API error for refresh: %s", exc) irc.reply(f"Refresh failed: {exc.detail}") return count = data.get("track_count", 0) irc.reply(f"Refreshed — {count} tracks") Class = NtrPlaylist