Files
NtR-soudcloud-fetcher/plugins/limnoria/NtrPlaylist/plugin.py
cottongin d6d5ac10e6 fix: separate bot vs viewer WebSocket connections, add client identification
The dashboard's own WS connection was being counted as a bot subscriber,
causing "1 bot connected" with no bots actually present. Now WS clients
send a role ("bot" or "viewer") in the subscribe message. Only bots count
toward the subscriber total. Bot plugins also send a configurable client_id
so the dashboard shows which specific bots are connected.

Made-with: Cursor
2026-03-12 07:51:55 -04:00

366 lines
12 KiB
Python

"""
NtR Playlist — Limnoria plugin for NtR SoundCloud Fetcher API.
"""
from __future__ import annotations
import json
import logging
import threading
import re
import urllib.error
import urllib.request
from datetime import datetime
from zoneinfo import ZoneInfo
from supybot import callbacks
from supybot.commands import optional, wrap
LOGGER = logging.getLogger(__name__)
# --- API helpers -------------------------------------------------------------
class ApiError(Exception):
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(f"{status_code}: {detail}")
def _api_get(base_url: str, path: str) -> dict:
url = f"{base_url.rstrip('/')}{path}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read().decode()
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise ApiError(resp.status, "Invalid API response") from exc
except urllib.error.HTTPError as e:
try:
detail = json.loads(e.read().decode()).get("detail", str(e))
except (json.JSONDecodeError, ValueError):
detail = str(e)
raise ApiError(e.code, detail) from e
except urllib.error.URLError as e:
raise ApiError(0, "Cannot reach API") from e
def _api_post(base_url: str, path: str, token: str, body: dict | None = None) -> dict:
url = f"{base_url.rstrip('/')}{path}"
encoded = None
headers = {"Accept": "application/json", "Authorization": f"Bearer {token}"}
if body is not None:
encoded = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=encoded, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read().decode()
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise ApiError(resp.status, "Invalid API response") from exc
except urllib.error.HTTPError as e:
try:
detail = json.loads(e.read().decode()).get("detail", str(e))
except (json.JSONDecodeError, ValueError):
detail = str(e)
raise ApiError(e.code, detail) from e
except urllib.error.URLError as e:
raise ApiError(0, "Cannot reach API") from e
# --- Formatting --------------------------------------------------------------
def format_dt(iso_string: str | None, tz_name: str = "America/New_York") -> str:
if not iso_string:
return "never"
dt = datetime.fromisoformat(iso_string)
local = dt.astimezone(ZoneInfo(tz_name))
return local.strftime("%a %b %-d, %-I:%M %p %Z")
_MAX_IRC_LINE = 430
def format_track(track: dict) -> str:
pos = track.get("position", 0)
title = track.get("title", "")
artist = track.get("artist", "")
url = track.get("permalink_url", "")
return f"Song #{pos}: {title} by {artist} - {url}"
def format_playlist(data: dict) -> str:
episode = data.get("episode_number", "?")
tracks = data.get("tracks", [])
count = len(tracks)
prefix = f"Episode {episode} ({count} tracks): "
parts: list[str] = []
length = len(prefix)
for t in tracks:
entry = f"{t.get('title', '')} by {t.get('artist', '')}"
sep = ", " if parts else ""
if length + len(sep) + len(entry) + 5 > _MAX_IRC_LINE: # +5 for ", ..."
parts.append("...")
break
parts.append(entry)
length += len(sep) + len(entry)
return prefix + ", ".join(parts)
# --- Plugin ------------------------------------------------------------------
_NUMBER_RE = re.compile(r"^!(\d+)$")
class NtrPlaylist(callbacks.Plugin):
"""Query the NtR SoundCloud Fetcher API from IRC."""
def __init__(self, irc):
super().__init__(irc)
self._irc = irc
self._ws_stop = threading.Event()
self._ws_thread = threading.Thread(target=self._ws_listener, daemon=True)
self._ws_thread.start()
def die(self):
self._ws_stop.set()
super().die()
def _ws_listener(self):
import websocket
from supybot import ircmsgs
backoff = 5
max_backoff = 60
while not self._ws_stop.is_set():
ws_url = self.registryValue("wsUrl")
token = self.registryValue("adminToken")
channel = self.registryValue("announceChannel")
client_id = self.registryValue("clientId") or "limnoria"
if not ws_url or not token:
LOGGER.warning("wsUrl or adminToken not configured, WS listener sleeping")
self._ws_stop.wait(30)
continue
ws = None
try:
ws = websocket.WebSocket()
ws.connect(ws_url, timeout=10)
ws.send(json.dumps({
"type": "subscribe",
"token": token,
"role": "bot",
"client_id": client_id,
}))
LOGGER.info("Connected to announce WebSocket at %s", ws_url)
backoff = 5
while not self._ws_stop.is_set():
ws.settimeout(5)
try:
raw = ws.recv()
if not raw:
break
data = json.loads(raw)
if data.get("type") == "announce" and "message" in data:
msg = ircmsgs.privmsg(channel, data["message"])
self._irc.queueMsg(msg)
LOGGER.info("Announced to %s: %s", channel, data["message"])
except websocket.WebSocketTimeoutException:
continue
except websocket.WebSocketConnectionClosedException:
break
except Exception:
LOGGER.exception("WS listener error")
finally:
if ws:
try:
ws.close()
except Exception:
pass
if not self._ws_stop.is_set():
LOGGER.info("Reconnecting in %ds", backoff)
self._ws_stop.wait(backoff)
backoff = min(backoff * 2, max_backoff)
def _is_admin(self, nick: str) -> bool:
admin_nicks = self.registryValue("adminNicks")
if not admin_nicks:
return False
return nick.lower() in [n.lower() for n in admin_nicks]
def doPrivmsg(self, irc, msg):
channel = msg.args[0] if msg.args else None
if not channel or not irc.isChannel(channel):
super().doPrivmsg(irc, msg)
return
text = msg.args[1] if len(msg.args) > 1 else ""
match = _NUMBER_RE.match(text)
if match:
position = match.group(1)
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_get(base_url, f"/playlist/{position}")
irc.reply(format_track(data))
except ApiError as exc:
LOGGER.warning("API error for !%s: %s", position, exc)
irc.reply(exc.detail)
super().doPrivmsg(irc, msg)
@wrap([optional("text")])
def song(self, irc, msg, args, text):
"""<episode> <position>
Returns a track from a specific episode's playlist.
"""
if not text:
irc.reply("Usage: !song <episode> <position>")
return
parts = text.strip().split()
if len(parts) < 2:
irc.reply("Usage: !song <episode> <position>")
return
try:
episode, position = int(parts[0]), int(parts[1])
except ValueError:
irc.reply("Usage: !song <episode> <position>")
return
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_get(base_url, f"/shows/by-episode/{episode}")
except ApiError as exc:
LOGGER.warning("API error for !song %s %s: %s", episode, position, exc)
irc.reply(exc.detail)
return
tracks = data.get("tracks", [])
track = next((t for t in tracks if t.get("position") == position), None)
if not track:
irc.reply(f"No track at position {position} in episode {episode}")
return
irc.reply(format_track(track))
@wrap([optional("text")])
def playlist(self, irc, msg, args, text):
"""[<episode>]
Returns the playlist for the current show, or a specific episode.
"""
base_url = self.registryValue("apiBaseUrl")
if text and text.strip():
try:
episode = int(text.strip())
except ValueError:
irc.reply("Usage: !playlist [episode]")
return
try:
data = _api_get(base_url, f"/shows/by-episode/{episode}")
except ApiError as exc:
LOGGER.warning("API error for playlist: %s", exc)
irc.reply(exc.detail)
return
else:
try:
data = _api_get(base_url, "/playlist")
except ApiError as exc:
LOGGER.warning("API error for playlist: %s", exc)
irc.reply(exc.detail)
return
irc.reply(format_playlist(data))
@wrap([optional("text")])
def lastshow(self, irc, msg, args, text):
"""<position>
Returns a track from last week's show by position number.
"""
if not text or not text.strip():
irc.reply("Usage: !lastshow <position>")
return
try:
position = int(text.strip())
except ValueError:
irc.reply("Usage: !lastshow <position>")
return
base_url = self.registryValue("apiBaseUrl")
try:
shows = _api_get(base_url, "/shows?limit=2")
except ApiError as exc:
LOGGER.warning("API error for lastshow: %s", exc)
irc.reply(exc.detail)
return
if len(shows) < 2:
irc.reply("No previous show found")
return
prev_show_id = shows[1]["id"]
try:
data = _api_get(base_url, f"/shows/{prev_show_id}")
except ApiError as exc:
LOGGER.warning("API error for lastshow show %s: %s", prev_show_id, exc)
irc.reply(exc.detail)
return
tracks = data.get("tracks", [])
track = next((t for t in tracks if t.get("position") == position), None)
if not track:
episode = data.get("episode_number", "?")
irc.reply(f"No track at position {position} in episode {episode}")
return
irc.reply(format_track(track))
@wrap
def status(self, irc, msg, args):
"""takes no arguments
Returns the current API status.
"""
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_get(base_url, "/health")
except ApiError as exc:
LOGGER.warning("API error for status: %s", exc)
irc.reply(exc.detail)
return
tz = self.registryValue("displayTimezone")
status = data.get("status", "unknown")
poller = "alive" if data.get("poller_alive") else "dead"
last_fetch = format_dt(data.get("last_fetch"), tz)
count = data.get("current_week_track_count", 0)
irc.reply(
f"Status: {status.upper()} | Poller: {poller} | Last fetch: {last_fetch} | Tracks this week: {count}"
)
@wrap
def refresh(self, irc, msg, args):
"""takes no arguments
Triggers a manual playlist refresh. Admin only.
"""
if not self._is_admin(msg.nick):
irc.reply("Access denied")
return
token = self.registryValue("adminToken")
if not token:
irc.reply("Admin token not configured")
return
base_url = self.registryValue("apiBaseUrl")
try:
data = _api_post(base_url, "/admin/refresh", token)
except ApiError as exc:
LOGGER.warning("API error for refresh: %s", exc)
irc.reply(f"Refresh failed: {exc.detail}")
return
count = data.get("track_count", 0)
irc.reply(f"Refreshed — {count} tracks")
Class = NtrPlaylist