Files
NtR-soudcloud-fetcher/plugins/limnoria/NtrPlaylist/plugin.py
cottongin 911dd3d5dd feat: tabbed show interface and copy-to-clipboard button
Replace the link column with a Copy button that copies
"Title by Artist - URL" to clipboard. Replace the current/previous
show layout with a horizontal scrollable tab bar showing all shows
from the database, most recent first. Tabs lazy-load and cache
show data on click.

Made-with: Cursor
2026-03-12 08:15:22 -04:00

386 lines
14 KiB
Python

"""
NtR Playlist — Limnoria plugin for NtR SoundCloud Fetcher API.
"""
from __future__ import annotations
import json
import logging
import threading
import re
import urllib.error
import urllib.request
from datetime import datetime
from zoneinfo import ZoneInfo
from supybot import callbacks
from supybot.commands import optional, wrap
LOGGER = logging.getLogger(__name__)
# --- API helpers -------------------------------------------------------------
class ApiError(Exception):
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(f"{status_code}: {detail}")
def _api_get(base_url: str, path: str) -> dict:
url = f"{base_url.rstrip('/')}{path}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read().decode()
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise ApiError(resp.status, "Invalid API response") from exc
except urllib.error.HTTPError as e:
try:
detail = json.loads(e.read().decode()).get("detail", str(e))
except (json.JSONDecodeError, ValueError):
detail = str(e)
raise ApiError(e.code, detail) from e
except urllib.error.URLError as e:
raise ApiError(0, "Cannot reach API") from e
def _api_post(base_url: str, path: str, token: str, body: dict | None = None) -> dict:
url = f"{base_url.rstrip('/')}{path}"
encoded = None
headers = {"Accept": "application/json", "Authorization": f"Bearer {token}"}
if body is not None:
encoded = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=encoded, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read().decode()
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise ApiError(resp.status, "Invalid API response") from exc
except urllib.error.HTTPError as e:
try:
detail = json.loads(e.read().decode()).get("detail", str(e))
except (json.JSONDecodeError, ValueError):
detail = str(e)
raise ApiError(e.code, detail) from e
except urllib.error.URLError as e:
raise ApiError(0, "Cannot reach API") from e
# --- Formatting --------------------------------------------------------------
def format_dt(iso_string: str | None, tz_name: str = "America/New_York") -> str:
if not iso_string:
return "never"
dt = datetime.fromisoformat(iso_string)
local = dt.astimezone(ZoneInfo(tz_name))
return local.strftime("%a %b %-d, %-I:%M %p %Z")
_MAX_IRC_LINE = 430
def format_track(track: dict) -> str:
pos = track.get("position", 0)
title = track.get("title", "")
artist = track.get("artist", "")
url = track.get("permalink_url", "")
return f"Song #{pos}: {title} by {artist} - {url}"
def format_playlist(data: dict) -> str:
episode = data.get("episode_number", "?")
tracks = data.get("tracks", [])
count = len(tracks)
prefix = f"Episode {episode} ({count} tracks): "
parts: list[str] = []
length = len(prefix)
for t in tracks:
entry = f"{t.get('title', '')} by {t.get('artist', '')}"
sep = ", " if parts else ""
if length + len(sep) + len(entry) + 5 > _MAX_IRC_LINE: # +5 for ", ..."
parts.append("...")
break
parts.append(entry)
length += len(sep) + len(entry)
return prefix + ", ".join(parts)
# --- Plugin ------------------------------------------------------------------
_NUMBER_RE = re.compile(r"^!(\d+)$")
class NtrPlaylist(callbacks.Plugin):
"""Query the NtR SoundCloud Fetcher API from IRC."""
def __init__(self, irc):
super().__init__(irc)
self._irc = irc
self._ws_stop = threading.Event()
self._ws_thread = threading.Thread(target=self._ws_listener, daemon=True)
self._ws_thread.start()
def die(self):
self._ws_stop.set()
super().die()
def _ws_listener(self):
import websocket
from supybot import ircmsgs
backoff = 5
max_backoff = 60
LOGGER.info("WS listener thread started")
while not self._ws_stop.is_set():
ws_url = self.registryValue("wsUrl")
if ws_url.startswith("https://"):
ws_url = "wss://" + ws_url[8:]
elif ws_url.startswith("http://"):
ws_url = "ws://" + ws_url[7:]
token = self.registryValue("adminToken")
channel = self.registryValue("announceChannel")
client_id = self.registryValue("clientId") or "limnoria"
if not ws_url or not token:
LOGGER.warning("wsUrl or adminToken not configured, WS listener sleeping")
self._ws_stop.wait(30)
continue
LOGGER.info("Connecting to %s as client_id=%s", ws_url, client_id)
ws = None
try:
ws = websocket.WebSocket()
ws.connect(ws_url, timeout=10)
LOGGER.info("WebSocket TCP connection established")
subscribe_msg = {
"type": "subscribe",
"token": token,
"role": "bot",
"client_id": client_id,
}
ws.send(json.dumps(subscribe_msg))
LOGGER.info("Sent subscribe message (role=bot, client_id=%s)", client_id)
backoff = 5
while not self._ws_stop.is_set():
ws.settimeout(5)
try:
raw = ws.recv()
if not raw:
LOGGER.warning("Received empty message, connection closing")
break
data = json.loads(raw)
LOGGER.debug("Received WS message: type=%s", data.get("type"))
if data.get("type") == "announce" and "message" in data:
msg = ircmsgs.privmsg(channel, data["message"])
self._irc.queueMsg(msg)
LOGGER.info("Announced to %s: %s", channel, data["message"])
elif data.get("type") == "status":
LOGGER.info(
"Status update: %d bot(s) connected, clients=%s",
data.get("subscribers", 0),
[c.get("client_id") for c in data.get("clients", [])],
)
except websocket.WebSocketTimeoutException:
continue
except websocket.WebSocketConnectionClosedException:
LOGGER.warning("WebSocket connection closed by server")
break
except ConnectionRefusedError:
LOGGER.warning("Connection refused at %s", ws_url)
except TimeoutError:
LOGGER.warning("Connection timed out to %s", ws_url)
except Exception:
LOGGER.exception("WS listener error")
finally:
if ws:
try:
ws.close()
LOGGER.debug("WebSocket closed cleanly")
except Exception:
pass
if not self._ws_stop.is_set():
LOGGER.info("Reconnecting in %ds", backoff)
self._ws_stop.wait(backoff)
backoff = min(backoff * 2, max_backoff)
def _is_admin(self, nick: str) -> bool:
admin_nicks = self.registryValue("adminNicks")
if not admin_nicks:
return False
return nick.lower() in [n.lower() for n in admin_nicks]
def doPrivmsg(self, irc, msg):
channel = msg.args[0] if msg.args else None
if not channel or not irc.isChannel(channel):
return
text = msg.args[1] if len(msg.args) > 1 else ""
match = _NUMBER_RE.match(text)
if match:
position = match.group(1)
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_get(base_url, f"/playlist/{position}")
irc.reply(format_track(data))
except ApiError as exc:
LOGGER.warning("API error for !%s: %s", position, exc)
irc.reply(exc.detail)
@wrap([optional("text")])
def song(self, irc, msg, args, text):
"""<episode> <position>
Returns a track from a specific episode's playlist.
"""
if not text:
irc.reply("Usage: !song <episode> <position>")
return
parts = text.strip().split()
if len(parts) < 2:
irc.reply("Usage: !song <episode> <position>")
return
try:
episode, position = int(parts[0]), int(parts[1])
except ValueError:
irc.reply("Usage: !song <episode> <position>")
return
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_get(base_url, f"/shows/by-episode/{episode}")
except ApiError as exc:
LOGGER.warning("API error for !song %s %s: %s", episode, position, exc)
irc.reply(exc.detail)
return
tracks = data.get("tracks", [])
track = next((t for t in tracks if t.get("position") == position), None)
if not track:
irc.reply(f"No track at position {position} in episode {episode}")
return
irc.reply(format_track(track))
@wrap([optional("text")])
def playlist(self, irc, msg, args, text):
"""[<episode>]
Returns the playlist for the current show, or a specific episode.
"""
base_url = self.registryValue("apiBaseUrl")
if text and text.strip():
try:
episode = int(text.strip())
except ValueError:
irc.reply("Usage: !playlist [episode]")
return
try:
data = _api_get(base_url, f"/shows/by-episode/{episode}")
except ApiError as exc:
LOGGER.warning("API error for playlist: %s", exc)
irc.reply(exc.detail)
return
else:
try:
data = _api_get(base_url, "/playlist")
except ApiError as exc:
LOGGER.warning("API error for playlist: %s", exc)
irc.reply(exc.detail)
return
irc.reply(format_playlist(data))
@wrap([optional("text")])
def lastshow(self, irc, msg, args, text):
"""<position>
Returns a track from last week's show by position number.
"""
if not text or not text.strip():
irc.reply("Usage: !lastshow <position>")
return
try:
position = int(text.strip())
except ValueError:
irc.reply("Usage: !lastshow <position>")
return
base_url = self.registryValue("apiBaseUrl")
try:
shows = _api_get(base_url, "/shows?limit=2")
except ApiError as exc:
LOGGER.warning("API error for lastshow: %s", exc)
irc.reply(exc.detail)
return
if len(shows) < 2:
irc.reply("No previous show found")
return
prev_show_id = shows[1]["id"]
try:
data = _api_get(base_url, f"/shows/{prev_show_id}")
except ApiError as exc:
LOGGER.warning("API error for lastshow show %s: %s", prev_show_id, exc)
irc.reply(exc.detail)
return
tracks = data.get("tracks", [])
track = next((t for t in tracks if t.get("position") == position), None)
if not track:
episode = data.get("episode_number", "?")
irc.reply(f"No track at position {position} in episode {episode}")
return
irc.reply(format_track(track))
@wrap
def status(self, irc, msg, args):
"""takes no arguments
Returns the current API status.
"""
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_get(base_url, "/health")
except ApiError as exc:
LOGGER.warning("API error for status: %s", exc)
irc.reply(exc.detail)
return
tz = self.registryValue("displayTimezone")
status = data.get("status", "unknown")
poller = "alive" if data.get("poller_alive") else "dead"
last_fetch = format_dt(data.get("last_fetch"), tz)
count = data.get("current_week_track_count", 0)
irc.reply(
f"Status: {status.upper()} | Poller: {poller} | Last fetch: {last_fetch} | Tracks this week: {count}"
)
@wrap
def refresh(self, irc, msg, args):
"""takes no arguments
Triggers a manual playlist refresh. Admin only.
"""
if not self._is_admin(msg.nick):
irc.reply("Access denied")
return
token = self.registryValue("adminToken")
if not token:
irc.reply("Admin token not configured")
return
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_post(base_url, "/admin/refresh", token)
except ApiError as exc:
LOGGER.warning("API error for refresh: %s", exc)
irc.reply(f"Refresh failed: {exc.detail}")
return
count = data.get("track_count", 0)
irc.reply(f"Refreshed — {count} tracks")
Class = NtrPlaylist