Initial commit: SongRequest Limnoria plugin

IRC song request plugin with iTunes validation, HTMX web dashboard,
WebSocket real-time updates, moderation, rate limiting, and history.

Made-with: Cursor
This commit is contained in:
cottongin
2026-03-28 04:43:50 -04:00
commit 6723413250
11 changed files with 2287 additions and 0 deletions

15
.gitignore vendored Normal file
View File

@@ -0,0 +1,15 @@
__pycache__/
*.py[cod]
*.db
*.db-journal
*.db-wal
*.db-shm
.env
*.egg-info/
dist/
build/
chat-summaries/
conf/
data/
logs/
backup/

115
README.md Normal file
View File

@@ -0,0 +1,115 @@
# SongRequest — Limnoria IRC Song Request Plugin
A Limnoria (Supybot) plugin that watches IRC channels for song requests, validates them against the iTunes Search API, and streams them in real time to an HTMX web dashboard via WebSocket.
## Features
- **Passive detection** — recognizes `Artist - Title` patterns in chat and validates against Apple Music
- **Explicit command** — `!request Artist - Title` for direct requests
- **Disambiguation** — presents top matches when multiple results found; user picks by number
- **Web dashboard** — HTMX-powered UI with album art, Apple Music links, and moderation controls
- **Real-time updates** — WebSocket pushes new requests and status changes to all connected dashboards
- **Moderation** — approve, reject, or mark requests as played from the web UI
- **Rate limiting** — configurable per-user request limits
- **Ignore list** — block specific users from making requests
- **Persistence** — SQLite-backed; survives bot restarts
- **IRC announcements** — optionally announces status changes back to the originating channel
- **Quiet mode** — suppress the "Queued" IRC confirmation per-channel
- **Alternate matches** — when disambiguating, unchosen tracks appear as collapsible sub-cards
- **Clickable cards** — the entire request card links to Apple Music
- **Export history** — download request history as a Markdown file
- **Open/close requests** — global toggle (with per-channel override) from IRC or the web panel
- **Clear history** — wipe played/rejected entries from IRC or the web panel
## Dependencies
- `aiohttp` — standalone async web server for the dashboard
```bash
pip install aiohttp
```
## Installation
1. Copy the `SongRequest/` directory into your Limnoria plugins directory (or add its parent to `config directories.plugins`).
2. Load the plugin:
```
@load SongRequest
```
3. Configure enabled channels:
```
@config plugins.SongRequest.enabledChannels #music #requests
```
4. Set a web dashboard auth token:
```
@config plugins.SongRequest.webAuthToken your-secret-token-here
```
5. Access the dashboard at `http://<bot-host>:8888/` (default port, configurable via `webPort`).
## Configuration
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `enabledChannels` | Space-separated list | (empty) | Channels for passive detection |
| `ignoredUsers` | Space-separated list | (empty) | Nicks/hostmasks to ignore |
| `maxRequestsPerUser` | Integer | 10 | Max requests per rate limit window (0 = unlimited) |
| `rateLimitWindow` | Integer | 3600 | Rate limit window in seconds |
| `webAuthToken` | String (private) | (empty) | Auth token for web dashboard actions |
| `announceStatus` | Boolean | True | Announce status changes back to IRC |
| `maxChoices` | Integer | 3 | Disambiguation choices shown |
| `webPort` | Integer | 8888 | Port for the web dashboard server |
| `webHost` | String | 0.0.0.0 | Bind address for the web dashboard server |
| `requestsOpen` | Boolean | True | Global toggle — accept or reject new requests |
| `requestsOpenOverride` | String (per-channel) | (empty) | Per-channel override: `open`, `closed`, or empty for global |
| `quietQueued` | Boolean (per-channel) | False | Suppress the "Queued: ..." IRC confirmation |
| `passiveDetection` | Boolean (per-channel) | True | Enable passive pattern matching |
| `requestCommand` | Boolean (per-channel) | True | Enable the `!request` command |
## Commands
| Command | Description | Permission |
|---------|-------------|------------|
| `request <text>` | Request a song by artist/title | Anyone |
| `pick <number>` | Pick from disambiguation choices | Anyone |
| `ignore <nick>` | Add a nick to the ignore list | Admin |
| `unignore <nick>` | Remove a nick from the ignore list | Admin |
| `requeststats` | Show queue statistics | Anyone |
| `openrequests [channel]` | Open requests globally or per-channel | Admin |
| `closerequests [channel]` | Close requests globally or per-channel | Admin |
| `clearhistory` | Clear all played/rejected requests | Admin |
## Web Dashboard
The dashboard runs on a standalone aiohttp server (separate from Limnoria's built-in HTTP server) at the configured `webPort` (default `8888`). It shows:
- **Pending** requests awaiting moderation
- **Approved** requests ready to play
- **History** of played/rejected requests
Each request card is a clickable link to Apple Music and displays album art, song title, artist, album name, requester info, and action buttons (Approve / Reject / Mark Played). When a request had disambiguation, the alternate matches appear in a collapsible section below the main card.
The history tab includes **Export .md** (download as Markdown) and **Clear History** buttons. A toggle switch in the header opens/closes requests globally (synced in real time across all connected dashboards).
Real-time updates are delivered via WebSocket — the status indicator dot in the header shows green when connected and red when disconnected (with automatic reconnection).
Admin actions require the auth token, which is automatically injected into the dashboard page at serve time.
## How Passive Detection Works
1. The bot watches messages in `enabledChannels` for lines matching `Something - Something`
2. Lines starting with bot command prefixes (`!`, `.`, `@`, etc.) or URLs are skipped
3. The extracted text is searched against the iTunes Search API
4. If no song matches, the message is silently ignored (primary false-positive filter)
5. If one match, it's queued automatically
6. If multiple matches, the user is presented with choices
## Running Tests
```bash
pip install limnoria aiohttp pytest
python -m pytest SongRequest/test.py -v
```

32
SongRequest/__init__.py Normal file
View File

@@ -0,0 +1,32 @@
"""
SongRequest: Watch IRC channels for song requests, validate against iTunes,
and stream them to an HTMX web dashboard via WebSocket.
"""
import supybot
import supybot.world as world
from importlib import reload
from . import config
from . import store
from . import itunes
from . import web
from . import plugin
reload(config)
reload(store)
reload(itunes)
reload(web)
reload(plugin)
if world.testing:
from . import test
Class = plugin.Class
configure = config.configure
__version__ = "0.1.0"
__author__ = supybot.Author("cottongin", "cottongin", "")
__contributors__ = {}
__url__ = ""

181
SongRequest/config.py Normal file
View File

@@ -0,0 +1,181 @@
import supybot.conf as conf
import supybot.registry as registry
try:
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization("SongRequest")
except ImportError:
_ = lambda x: x
def configure(advanced):
from supybot.questions import expect, anything, something, yn
conf.registerPlugin("SongRequest", True)
SongRequest = conf.registerPlugin("SongRequest")
conf.registerGlobalValue(
SongRequest,
"enabledChannels",
registry.SpaceSeparatedListOfStrings(
"",
_("""Space-separated list of channels to watch for passive
song request detection (e.g. #music #requests)."""),
),
)
conf.registerGlobalValue(
SongRequest,
"ignoredUsers",
registry.SpaceSeparatedListOfStrings(
"",
_("""Space-separated list of nicks or hostmasks to ignore.
Messages from these users are silently skipped."""),
),
)
conf.registerGlobalValue(
SongRequest,
"maxRequestsPerUser",
registry.NonNegativeInteger(
10,
_("""Maximum number of requests a single user can make within
the rate limit window. 0 disables rate limiting."""),
),
)
conf.registerGlobalValue(
SongRequest,
"rateLimitWindow",
registry.PositiveInteger(
3600,
_("""Duration in seconds for the per-user rate limit window."""),
),
)
conf.registerGlobalValue(
SongRequest,
"webAuthToken",
registry.String(
"",
_("""Secret token required for web dashboard admin actions.
Set this to a random string."""),
private=True,
),
)
conf.registerGlobalValue(
SongRequest,
"announceStatus",
registry.Boolean(
True,
_("""Master switch for all IRC status announcements. When False,
no status changes are announced regardless of the individual
announce* settings below."""),
),
)
conf.registerChannelValue(
SongRequest,
"announceApproved",
registry.Boolean(
True,
_("""Announce when a request is approved."""),
),
)
conf.registerChannelValue(
SongRequest,
"announceRejected",
registry.Boolean(
True,
_("""Announce when a request is rejected."""),
),
)
conf.registerChannelValue(
SongRequest,
"announceNowPlaying",
registry.Boolean(
True,
_("""Announce when a request is marked as now playing."""),
),
)
conf.registerGlobalValue(
SongRequest,
"maxChoices",
registry.PositiveInteger(
3,
_("""Number of disambiguation choices to present when multiple
iTunes matches are found."""),
),
)
conf.registerGlobalValue(
SongRequest,
"webPort",
registry.PositiveInteger(
8888,
_("""Port for the standalone web dashboard server."""),
),
)
conf.registerGlobalValue(
SongRequest,
"webHost",
registry.String(
"0.0.0.0",
_("""Bind address for the web dashboard server."""),
),
)
conf.registerGlobalValue(
SongRequest,
"requestsOpen",
registry.Boolean(
True,
_("""Global default for whether song requests are currently being
accepted. Can be overridden per-channel."""),
),
)
conf.registerChannelValue(
SongRequest,
"requestsOpenOverride",
registry.String(
"",
_("""Per-channel override for requestsOpen. Set to 'open', 'closed',
or leave empty to use the global default."""),
),
)
conf.registerChannelValue(
SongRequest,
"quietQueued",
registry.Boolean(
False,
_("""When True, suppress the 'Queued: ...' confirmation message
in IRC after a song request is accepted."""),
),
)
conf.registerChannelValue(
SongRequest,
"passiveDetection",
registry.Boolean(
True,
_("""Enable passive 'Artist - Title' pattern matching in this
channel. The channel must also be in enabledChannels."""),
),
)
conf.registerChannelValue(
SongRequest,
"requestCommand",
registry.Boolean(
True,
_("""Enable the !request command in this channel."""),
),
)

77
SongRequest/itunes.py Normal file
View File

@@ -0,0 +1,77 @@
import json
import urllib.request
import urllib.parse
from dataclasses import dataclass, asdict
from typing import List, Optional
import supybot.log as log
SEARCH_URL = "https://itunes.apple.com/search"
REQUEST_TIMEOUT = 10
@dataclass
class Track:
track_id: int
title: str
artist: str
album: str
artwork_url: str
apple_music_url: str
preview_url: Optional[str] = None
@classmethod
def from_itunes(cls, item: dict) -> "Track":
artwork = item.get("artworkUrl100", item.get("artworkUrl60", ""))
artwork_large = artwork.replace("100x100", "600x600") if artwork else ""
return cls(
track_id=item["trackId"],
title=item.get("trackName", ""),
artist=item.get("artistName", ""),
album=item.get("collectionName", ""),
artwork_url=artwork_large,
apple_music_url=item.get("trackViewUrl", ""),
preview_url=item.get("previewUrl"),
)
def display(self) -> str:
return f"{self.title} - {self.artist} ({self.album})"
def to_dict(self) -> dict:
return asdict(self)
def search(query: str, limit: int = 5) -> List[Track]:
"""Search the iTunes Search API for songs matching the query."""
params = urllib.parse.urlencode({
"term": query,
"media": "music",
"entity": "song",
"limit": limit,
})
url = f"{SEARCH_URL}?{params}"
try:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception:
log.exception("SongRequest: iTunes search failed for query %r", query)
return []
results = data.get("results", [])
tracks = []
for item in results:
if item.get("wrapperType") != "track" or item.get("kind") != "song":
continue
try:
tracks.append(Track.from_itunes(item))
except (KeyError, TypeError):
continue
return tracks
def search_artist_title(artist: str, title: str, limit: int = 5) -> List[Track]:
"""Search with both artist and title terms for better relevance."""
return search(f"{artist} {title}", limit=limit)

453
SongRequest/plugin.py Normal file
View File

@@ -0,0 +1,453 @@
import json
import os
import re
import time
import threading
import supybot.conf as conf
import supybot.utils as utils
import supybot.ircdb as ircdb
import supybot.ircmsgs as ircmsgs
import supybot.callbacks as callbacks
import supybot.log as log
import supybot.world as world
from supybot.commands import optional, wrap
try:
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization("SongRequest")
except ImportError:
_ = lambda x: x
from . import itunes
from .store import RequestStore, SongRequest as SongRequestModel
from .web import WebServer, render_request_card
SONG_PATTERN = re.compile(
r"^(?!https?://)" # reject URLs
r"(?P<left>[A-Za-z0-9]" # starts with alphanumeric
r"[^-\n]{1,60})" # left side, up to ~60 chars
r"\s+[-\u2013\u2014]\s+" # dash separator with whitespace (-, en-dash, em-dash)
r"(?P<right>[A-Za-z0-9]" # right side starts alphanumeric
r"[^-\n]{1,60})$", # right side, up to ~60 chars
re.UNICODE,
)
CHOICE_TIMEOUT = 60
class SongRequest(callbacks.Plugin):
"""Watch IRC channels for song requests, validate them against iTunes,
and stream them to an HTMX web dashboard via WebSocket."""
threaded = True
def __init__(self, irc):
super().__init__(irc)
db_dir = conf.supybot.directories.data.dirize("SongRequest")
os.makedirs(db_dir, exist_ok=True)
db_path = os.path.join(db_dir, "requests.db")
self.store = RequestStore(db_path)
self._pending_choices = {}
self._choices_lock = threading.Lock()
self._rate_limits = {}
self._rate_lock = threading.Lock()
self._web_server = WebServer(
host=self.registryValue("webHost"),
port=self.registryValue("webPort"),
store=self.store,
get_auth_token=lambda: self.registryValue("webAuthToken"),
on_status_change=self._on_web_status_change,
get_requests_open=lambda: self.registryValue("requestsOpen"),
set_requests_open=lambda v: self.setRegistryValue("requestsOpen", v),
)
self._web_server.start()
def die(self):
self._web_server.stop()
super().die()
# ------------------------------------------------------------------
# Open/closed state
# ------------------------------------------------------------------
def _requests_open(self, channel=None, network=None):
"""Check if requests are open, respecting per-channel override."""
if channel and network:
override = self.registryValue("requestsOpenOverride", channel, network)
if override == "open":
return True
if override == "closed":
return False
return self.registryValue("requestsOpen")
# ------------------------------------------------------------------
# Rate limiting
# ------------------------------------------------------------------
def _check_rate_limit(self, network, nick):
"""Return True if user is within rate limits, False if exceeded."""
max_req = self.registryValue("maxRequestsPerUser")
if max_req == 0:
return True
window = self.registryValue("rateLimitWindow")
now = time.time()
key = (network, nick.lower())
with self._rate_lock:
timestamps = self._rate_limits.get(key, [])
timestamps = [t for t in timestamps if now - t < window]
self._rate_limits[key] = timestamps
if len(timestamps) >= max_req:
return False
timestamps.append(now)
return True
# ------------------------------------------------------------------
# Ignore list
# ------------------------------------------------------------------
def _is_ignored(self, nick, host):
ignored = self.registryValue("ignoredUsers")
nick_lower = nick.lower()
for entry in ignored:
entry_lower = entry.lower()
if entry_lower == nick_lower:
return True
if "!" in entry or "@" in entry:
if entry_lower in host.lower():
return True
return False
# ------------------------------------------------------------------
# Disambiguation / pending choices
# ------------------------------------------------------------------
def _store_choices(self, network, channel, nick, tracks):
key = (network, channel.lower(), nick.lower())
with self._choices_lock:
self._pending_choices[key] = {
"tracks": tracks,
"expires": time.time() + CHOICE_TIMEOUT,
}
def _get_choices(self, network, channel, nick):
key = (network, channel.lower(), nick.lower())
with self._choices_lock:
entry = self._pending_choices.get(key)
if entry and time.time() < entry["expires"]:
return entry["tracks"]
self._pending_choices.pop(key, None)
return None
def _clear_choices(self, network, channel, nick):
key = (network, channel.lower(), nick.lower())
with self._choices_lock:
self._pending_choices.pop(key, None)
def _present_choices(self, irc, msg, tracks, max_choices):
display = tracks[:max_choices]
parts = []
for i, t in enumerate(display, 1):
parts.append(f"[{i}] {t.display()}")
irc.reply(
" | ".join(parts) + " -- Reply with a number to pick, or 'cancel'.",
prefixNick=True,
)
self._store_choices(irc.network, msg.channel, msg.nick, display)
# ------------------------------------------------------------------
# Core request submission
# ------------------------------------------------------------------
def _submit_request(self, irc, msg, track, alternates=None):
alt_json = ""
if alternates:
alt_json = json.dumps([t.to_dict() for t in alternates])
req = SongRequestModel(
id=None,
itunes_track_id=track.track_id,
title=track.title,
artist=track.artist,
album=track.album,
artwork_url=track.artwork_url,
apple_music_url=track.apple_music_url,
requester_nick=msg.nick,
requester_host=msg.prefix or "",
channel=msg.channel or "",
network=irc.network,
status="pending",
created_at=0,
updated_at=0,
alternates_json=alt_json,
)
req = self.store.add(req)
card = render_request_card(req)
self._web_server.publish("request-new", card)
if not self.registryValue("quietQueued", msg.channel, irc.network):
irc.reply(
f"Queued: {track.display()} \x02|\x02 {track.apple_music_url}",
prefixNick=True,
)
def _lookup_and_submit(self, irc, msg, query):
"""Search iTunes, handle disambiguation or direct submit."""
if not self._requests_open(msg.channel, irc.network):
irc.reply("Requests are currently closed.", prefixNick=True)
return
if not self._check_rate_limit(irc.network, msg.nick):
irc.reply("You've hit the request limit. Please wait a while.", prefixNick=True)
return
max_choices = self.registryValue("maxChoices")
tracks = itunes.search(query, limit=max_choices + 2)
if not tracks:
return
if len(tracks) == 1:
self._submit_request(irc, msg, tracks[0])
else:
self._present_choices(irc, msg, tracks, max_choices)
# ------------------------------------------------------------------
# IRC announce on web status changes
# ------------------------------------------------------------------
def _on_web_status_change(self, req):
if not self.registryValue("announceStatus"):
return
if not req.channel or not req.network:
return
status_config = {
"approved": "announceApproved",
"rejected": "announceRejected",
"played": "announceNowPlaying",
}
config_key = status_config.get(req.status)
if not config_key:
return
if not self.registryValue(config_key, req.channel, req.network):
return
status_labels = {
"approved": "APPROVED",
"rejected": "REJECTED",
"played": "NOW PLAYING",
}
label = status_labels[req.status]
text = f"[{label}] {req.title} - {req.artist} (requested by {req.requester_nick})"
for irc in world.ircs:
if irc.network == req.network:
try:
irc.queueMsg(ircmsgs.privmsg(req.channel, text))
except Exception:
log.exception("SongRequest: Failed to announce status change")
break
# ------------------------------------------------------------------
# doPrivmsg — passive detection + choice picking
# ------------------------------------------------------------------
def doPrivmsg(self, irc, msg):
if not msg.channel:
return
if msg.nick == irc.nick:
return
if self._is_ignored(msg.nick, msg.prefix or ""):
return
text = msg.args[1] if len(msg.args) > 1 else ""
if not text:
return
stripped = text.strip()
if stripped.isdigit():
tracks = self._get_choices(irc.network, msg.channel, msg.nick)
if tracks:
idx = int(stripped) - 1
if 0 <= idx < len(tracks):
self._clear_choices(irc.network, msg.channel, msg.nick)
alternates = [t for i, t in enumerate(tracks) if i != idx]
self._submit_request(irc, msg, tracks[idx], alternates=alternates)
else:
irc.reply(
f"Pick a number between 1 and {len(tracks)}, or 'cancel'.",
prefixNick=True,
)
return
if stripped.lower() == "cancel":
if self._get_choices(irc.network, msg.channel, msg.nick):
self._clear_choices(irc.network, msg.channel, msg.nick)
irc.reply("Request cancelled.", prefixNick=True)
return
enabled_channels = [
c.lower() for c in self.registryValue("enabledChannels")
]
if msg.channel.lower() not in enabled_channels:
return
if not self.registryValue("passiveDetection", msg.channel, irc.network):
return
if not self._requests_open(msg.channel, irc.network):
return
if text.startswith(("!", ".", "@", "~", "?")):
return
if len(text) > 120:
return
m = SONG_PATTERN.match(text.strip())
if not m:
return
left = m.group("left").strip()
right = m.group("right").strip()
query = f"{left} {right}"
self._lookup_and_submit(irc, msg, query)
# ------------------------------------------------------------------
# Commands
# ------------------------------------------------------------------
def request(self, irc, msg, args, text):
"""<artist - title> or <title - artist>
Request a song. The bot will search Apple Music and queue it.
"""
channel = msg.channel
if channel and not self.registryValue("requestCommand", channel, irc.network):
irc.reply("Song requests are not enabled in this channel.", prefixNick=True)
return
if self._is_ignored(msg.nick, msg.prefix or ""):
return
self._lookup_and_submit(irc, msg, text)
request = wrap(request, ["text"])
def pick(self, irc, msg, args, number):
"""<number>
Pick a song from the disambiguation list.
"""
tracks = self._get_choices(irc.network, msg.channel or "", msg.nick)
if not tracks:
irc.reply("No pending choices. Use !request first.", prefixNick=True)
return
idx = number - 1
if not (0 <= idx < len(tracks)):
irc.reply(
f"Pick a number between 1 and {len(tracks)}.",
prefixNick=True,
)
return
self._clear_choices(irc.network, msg.channel or "", msg.nick)
alternates = [t for i, t in enumerate(tracks) if i != idx]
self._submit_request(irc, msg, tracks[idx], alternates=alternates)
pick = wrap(pick, ["positiveInt"])
def ignore(self, irc, msg, args, nick):
"""<nick>
Add a nick to the song request ignore list. Requires admin capability.
"""
current = list(self.registryValue("ignoredUsers"))
if nick.lower() in [n.lower() for n in current]:
irc.reply(f"{nick} is already ignored.", prefixNick=True)
return
current.append(nick)
self.setRegistryValue("ignoredUsers", current)
irc.reply(f"{nick} added to ignore list.", prefixNick=True)
ignore = wrap(ignore, ["admin", "nick"])
def unignore(self, irc, msg, args, nick):
"""<nick>
Remove a nick from the song request ignore list. Requires admin capability.
"""
current = list(self.registryValue("ignoredUsers"))
filtered = [n for n in current if n.lower() != nick.lower()]
if len(filtered) == len(current):
irc.reply(f"{nick} is not on the ignore list.", prefixNick=True)
return
self.setRegistryValue("ignoredUsers", filtered)
irc.reply(f"{nick} removed from ignore list.", prefixNick=True)
unignore = wrap(unignore, ["admin", "nick"])
def requeststats(self, irc, msg, args):
"""(takes no arguments)
Show song request queue statistics.
"""
pending = len(self.store.get_pending())
approved = len(self.store.get_approved())
history = len(self.store.get_history(limit=1000))
irc.reply(
f"Requests: {pending} pending, {approved} approved, {history} total",
prefixNick=True,
)
requeststats = wrap(requeststats, [])
def openrequests(self, irc, msg, args, channel):
"""[<channel>]
Open song requests globally, or for the given channel. Requires admin.
"""
if channel:
self.setRegistryValue("requestsOpenOverride", "open", channel, irc.network)
irc.reply(f"Requests are now open in {channel}.", prefixNick=True)
else:
self.setRegistryValue("requestsOpen", True)
irc.reply("Requests are now open globally.", prefixNick=True)
openrequests = wrap(openrequests, ["admin", optional("channel")])
def closerequests(self, irc, msg, args, channel):
"""[<channel>]
Close song requests globally, or for the given channel. Requires admin.
"""
if channel:
self.setRegistryValue("requestsOpenOverride", "closed", channel, irc.network)
irc.reply(f"Requests are now closed in {channel}.", prefixNick=True)
else:
self.setRegistryValue("requestsOpen", False)
irc.reply("Requests are now closed globally.", prefixNick=True)
closerequests = wrap(closerequests, ["admin", optional("channel")])
def clearhistory(self, irc, msg, args):
"""(takes no arguments)
Clear all played and rejected requests from history. Requires admin.
"""
count = self.store.clear_history()
self._web_server.publish("history-cleared", "")
irc.reply(f"Cleared {count} request(s) from history.", prefixNick=True)
clearhistory = wrap(clearhistory, ["admin"])
Class = SongRequest

1
SongRequest/static/htmx.min.js vendored Normal file

File diff suppressed because one or more lines are too long

196
SongRequest/store.py Normal file
View File

@@ -0,0 +1,196 @@
import os
import sqlite3
import time
import threading
from dataclasses import dataclass, asdict
from typing import List, Optional
import supybot.log as log
STATUS_PENDING = "pending"
STATUS_APPROVED = "approved"
STATUS_REJECTED = "rejected"
STATUS_PLAYED = "played"
VALID_STATUSES = {STATUS_PENDING, STATUS_APPROVED, STATUS_REJECTED, STATUS_PLAYED}
SCHEMA = """
CREATE TABLE IF NOT EXISTS requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
itunes_track_id INTEGER NOT NULL,
title TEXT NOT NULL,
artist TEXT NOT NULL,
album TEXT NOT NULL DEFAULT '',
artwork_url TEXT NOT NULL DEFAULT '',
apple_music_url TEXT NOT NULL DEFAULT '',
requester_nick TEXT NOT NULL,
requester_host TEXT NOT NULL DEFAULT '',
channel TEXT NOT NULL,
network TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
alternates_json TEXT NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_requests_status ON requests(status);
CREATE INDEX IF NOT EXISTS idx_requests_channel ON requests(channel);
"""
MIGRATIONS = [
"ALTER TABLE requests ADD COLUMN alternates_json TEXT NOT NULL DEFAULT ''",
]
@dataclass
class SongRequest:
id: Optional[int]
itunes_track_id: int
title: str
artist: str
album: str
artwork_url: str
apple_music_url: str
requester_nick: str
requester_host: str
channel: str
network: str
status: str
created_at: float
updated_at: float
alternates_json: str = ""
def to_dict(self) -> dict:
return asdict(self)
class RequestStore:
def __init__(self, db_path: str):
self._db_path = db_path
self._lock = threading.Lock()
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
self._init_db()
def _init_db(self):
with self._connect() as conn:
conn.executescript(SCHEMA)
for migration in MIGRATIONS:
try:
conn.execute(migration)
except sqlite3.OperationalError:
pass
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
def add(self, req: SongRequest) -> SongRequest:
now = time.time()
req.created_at = now
req.updated_at = now
req.status = STATUS_PENDING
with self._lock, self._connect() as conn:
cur = conn.execute(
"""INSERT INTO requests
(itunes_track_id, title, artist, album, artwork_url,
apple_music_url, requester_nick, requester_host,
channel, network, status, created_at, updated_at,
alternates_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
req.itunes_track_id, req.title, req.artist, req.album,
req.artwork_url, req.apple_music_url,
req.requester_nick, req.requester_host,
req.channel, req.network, req.status,
req.created_at, req.updated_at,
req.alternates_json,
),
)
req.id = cur.lastrowid
return req
def update_status(self, request_id: int, new_status: str) -> Optional[SongRequest]:
if new_status not in VALID_STATUSES:
raise ValueError(f"Invalid status: {new_status}")
now = time.time()
with self._lock, self._connect() as conn:
conn.execute(
"UPDATE requests SET status = ?, updated_at = ? WHERE id = ?",
(new_status, now, request_id),
)
return self._get_by_id(conn, request_id)
def get(self, request_id: int) -> Optional[SongRequest]:
with self._lock, self._connect() as conn:
return self._get_by_id(conn, request_id)
def _get_by_id(self, conn, request_id: int) -> Optional[SongRequest]:
row = conn.execute(
"SELECT * FROM requests WHERE id = ?", (request_id,)
).fetchone()
return self._row_to_request(row) if row else None
def get_by_status(self, *statuses: str) -> List[SongRequest]:
placeholders = ",".join("?" for _ in statuses)
with self._lock, self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM requests WHERE status IN ({placeholders}) ORDER BY created_at ASC",
statuses,
).fetchall()
return [self._row_to_request(r) for r in rows]
def get_pending(self) -> List[SongRequest]:
return self.get_by_status(STATUS_PENDING)
def get_approved(self) -> List[SongRequest]:
return self.get_by_status(STATUS_APPROVED)
def get_active(self) -> List[SongRequest]:
return self.get_by_status(STATUS_PENDING, STATUS_APPROVED)
def get_history(self, limit: int = 50) -> List[SongRequest]:
with self._lock, self._connect() as conn:
rows = conn.execute(
"SELECT * FROM requests ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [self._row_to_request(r) for r in rows]
def swap_alternate(self, request_id: int, alt: dict) -> None:
"""Replace a request's track data with an alternate's data."""
now = time.time()
with self._lock, self._connect() as conn:
conn.execute(
"""UPDATE requests SET
itunes_track_id = ?, title = ?, artist = ?, album = ?,
artwork_url = ?, apple_music_url = ?, updated_at = ?
WHERE id = ?""",
(
alt.get("track_id", 0),
alt.get("title", ""),
alt.get("artist", ""),
alt.get("album", ""),
alt.get("artwork_url", ""),
alt.get("apple_music_url", ""),
now,
request_id,
),
)
def clear_history(self) -> int:
with self._lock, self._connect() as conn:
cur = conn.execute(
"DELETE FROM requests WHERE status IN (?, ?)",
(STATUS_PLAYED, STATUS_REJECTED),
)
return cur.rowcount
def delete(self, request_id: int) -> bool:
with self._lock, self._connect() as conn:
cur = conn.execute("DELETE FROM requests WHERE id = ?", (request_id,))
return cur.rowcount > 0
@staticmethod
def _row_to_request(row: sqlite3.Row) -> SongRequest:
return SongRequest(**dict(row))

View File

@@ -0,0 +1,594 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Song Requests</title>
<script src="/static/htmx.min.js"></script>
<style>
:root {
--bg: #0f0f0f;
--surface: #1a1a1a;
--surface-hover: #242424;
--border: #2a2a2a;
--text: #e8e8e8;
--text-muted: #888;
--accent: #fa2d48;
--accent-hover: #ff4562;
--approve: #2dd4a0;
--approve-hover: #4aeab8;
--reject: #fa2d48;
--reject-hover: #ff4562;
--played: #6366f1;
--played-hover: #818cf8;
--pending-bg: rgba(250, 45, 72, 0.08);
--approved-bg: rgba(45, 212, 160, 0.08);
--rejected-bg: rgba(250, 45, 72, 0.05);
--played-bg: rgba(99, 102, 241, 0.08);
--radius: 12px;
}
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: system-ui, -apple-system, sans-serif;
background: var(--bg);
color: var(--text);
line-height: 1.5;
min-height: 100vh;
}
.container {
max-width: 900px;
margin: 0 auto;
padding: 2rem 1.5rem;
}
header {
display: flex;
align-items: center;
gap: 1rem;
margin-bottom: 2.5rem;
padding-bottom: 1.5rem;
border-bottom: 1px solid var(--border);
}
header .logo {
width: 40px;
height: 40px;
background: linear-gradient(135deg, var(--accent), #fc6076);
border-radius: 10px;
display: flex;
align-items: center;
justify-content: center;
font-size: 1.2rem;
flex-shrink: 0;
}
header h1 {
font-size: 1.5rem;
font-weight: 600;
letter-spacing: -0.02em;
}
.toggle-label {
display: flex;
align-items: center;
cursor: pointer;
margin-left: auto;
}
.toggle-label input { display: none; }
.toggle-slider {
width: 36px;
height: 20px;
background: var(--reject);
border-radius: 10px;
position: relative;
transition: background 0.2s;
}
.toggle-slider::after {
content: '';
position: absolute;
width: 16px;
height: 16px;
background: #fff;
border-radius: 50%;
top: 2px;
left: 2px;
transition: transform 0.2s;
}
.toggle-label input:checked + .toggle-slider { background: var(--approve); }
.toggle-label input:checked + .toggle-slider::after { transform: translateX(16px); }
.closed-banner {
text-align: center;
padding: 0.5rem;
margin-bottom: 1rem;
background: rgba(250, 45, 72, 0.12);
border: 1px solid rgba(250, 45, 72, 0.2);
border-radius: 8px;
color: var(--reject);
font-size: 0.875rem;
font-weight: 500;
}
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
margin-left: 0.75rem;
transition: background 0.3s;
}
.status-dot.connected { background: var(--approve); animation: pulse 2s ease-in-out infinite; }
.status-dot.disconnected { background: var(--reject); animation: none; }
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.4; }
}
.tabs {
display: flex;
gap: 0.25rem;
margin-bottom: 1.5rem;
background: var(--surface);
padding: 4px;
border-radius: 10px;
}
.tab {
flex: 1;
padding: 0.6rem 1rem;
background: transparent;
border: none;
color: var(--text-muted);
font-size: 0.875rem;
font-weight: 500;
cursor: pointer;
border-radius: 8px;
transition: all 0.15s ease;
}
.tab:hover { color: var(--text); }
.tab.active { background: var(--surface-hover); color: var(--text); }
.request-list {
display: flex;
flex-direction: column;
gap: 0.75rem;
}
.request-card {
display: flex;
gap: 1rem;
padding: 1rem;
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
transition: all 0.2s ease;
}
.request-card:hover { border-color: #3a3a3a; }
.request-card.status-pending { background: var(--pending-bg); border-color: rgba(250, 45, 72, 0.15); }
.request-card.status-approved { background: var(--approved-bg); border-color: rgba(45, 212, 160, 0.15); }
.request-card.status-rejected { background: var(--rejected-bg); border-color: rgba(250, 45, 72, 0.1); opacity: 0.6; }
.request-card.status-played { background: var(--played-bg); border-color: rgba(99, 102, 241, 0.15); }
.album-art {
width: 80px;
height: 80px;
border-radius: 8px;
object-fit: cover;
flex-shrink: 0;
background: var(--surface-hover);
}
.card-body {
flex: 1;
min-width: 0;
}
.card-title {
font-weight: 600;
font-size: 1rem;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.card-artist {
color: var(--accent);
font-size: 0.875rem;
font-weight: 500;
}
.card-album {
color: var(--text-muted);
font-size: 0.8125rem;
}
.card-link {
text-decoration: none;
color: inherit;
display: flex;
cursor: pointer !important;
}
.card-link:hover { border-color: #444; }
.btn-sm {
padding: 0.2rem 0.5rem;
font-size: 0.6875rem;
margin-left: auto;
flex-shrink: 0;
}
.card-meta {
color: var(--text-muted);
font-size: 0.75rem;
margin-top: 0.25rem;
}
.card-status-badge {
display: inline-block;
font-size: 0.6875rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.05em;
padding: 0.15rem 0.5rem;
border-radius: 6px;
margin-top: 0.35rem;
background: var(--surface-hover);
color: var(--text-muted);
}
.status-pending .card-status-badge { background: rgba(250, 45, 72, 0.15); color: var(--accent); }
.status-approved .card-status-badge { background: rgba(45, 212, 160, 0.15); color: var(--approve); }
.status-rejected .card-status-badge { background: rgba(250, 45, 72, 0.12); color: var(--reject); }
.status-played .card-status-badge { background: rgba(99, 102, 241, 0.15); color: var(--played); }
.card-actions {
display: flex;
gap: 0.5rem;
margin-top: 0.5rem;
}
.btn {
position: relative;
z-index: 1;
padding: 0.4rem 0.85rem;
border: none;
border-radius: 8px;
font-size: 0.8125rem;
font-weight: 500;
cursor: pointer;
transition: all 0.15s ease;
color: #fff;
}
.btn-approve { background: var(--approve); }
.btn-approve:hover { background: var(--approve-hover); }
.btn-reject { background: var(--reject); }
.btn-reject:hover { background: var(--reject-hover); }
.btn-played { background: var(--played); }
.btn-played:hover { background: var(--played-hover); }
.empty-state {
text-align: center;
padding: 4rem 2rem;
color: var(--text-muted);
}
.empty-state p { font-size: 1rem; }
.empty-state .hint { font-size: 0.8125rem; margin-top: 0.5rem; }
.section-header {
display: flex;
align-items: center;
justify-content: space-between;
margin-bottom: 0.75rem;
}
.section-actions {
display: flex;
gap: 0.5rem;
}
.btn-secondary {
background: var(--surface-hover);
color: var(--text-muted);
font-size: 0.75rem;
padding: 0.3rem 0.65rem;
}
.btn-secondary:hover { background: var(--border); color: var(--text); }
.btn-danger-text:hover { color: var(--reject); }
.section-title {
font-size: 0.75rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.08em;
color: var(--text-muted);
}
.section + .section { margin-top: 2rem; }
.card-wrapper {
border-radius: var(--radius);
}
.alternates {
padding: 0.5rem 1rem 0.75rem;
background: var(--surface);
border: 1px solid var(--border);
border-top: none;
border-radius: 0 0 var(--radius) var(--radius);
}
.alternates summary {
cursor: pointer;
color: var(--text-muted);
font-size: 0.75rem;
font-weight: 500;
user-select: none;
}
.alt-list {
display: flex;
flex-direction: column;
gap: 0.5rem;
margin-top: 0.5rem;
}
.alt-card {
display: flex;
align-items: center;
gap: 0.625rem;
padding: 0.375rem 0.5rem;
border-radius: 8px;
text-decoration: none;
color: inherit;
cursor: pointer;
transition: background 0.15s;
}
.alt-card:hover { background: var(--surface-hover); }
.alt-art {
width: 36px;
height: 36px;
border-radius: 4px;
object-fit: cover;
flex-shrink: 0;
background: var(--surface-hover);
}
.alt-info {
display: flex;
flex-direction: column;
min-width: 0;
}
.alt-title {
font-size: 0.8125rem;
font-weight: 500;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.alt-artist {
font-size: 0.75rem;
color: var(--text-muted);
}
.alternates + .card-link { border-radius: var(--radius) var(--radius) 0 0; }
.card-wrapper .card-link { border-radius: var(--radius); }
.card-wrapper:has(.alternates) > .card-link { border-radius: var(--radius) var(--radius) 0 0; }
</style>
</head>
<body>
<div class="container">
<header>
<div class="logo">&#9835;</div>
<h1>Song Requests</h1>
<label class="toggle-label" title="Open/close requests">
<input type="checkbox" id="requests-toggle" checked onchange="toggleRequests(this.checked)" />
<span class="toggle-slider"></span>
</label>
<div id="status-dot" class="status-dot disconnected" title="Disconnected"></div>
</header>
<div id="closed-banner" class="closed-banner" style="display:none;">Requests are closed</div>
<div class="tabs">
<button class="tab active" onclick="showTab('queue')">Queue</button>
<button class="tab" onclick="showTab('history')">History</button>
</div>
<div id="queue-view">
<div class="section">
<div class="section-title">Pending</div>
<div id="pending-list" class="request-list"
hx-get="/api/requests?status=pending"
hx-trigger="load"
hx-swap="innerHTML">
</div>
</div>
<div class="section">
<div class="section-title">Approved</div>
<div id="approved-list" class="request-list"
hx-get="/api/requests?status=approved"
hx-trigger="load"
hx-swap="innerHTML">
</div>
</div>
</div>
<div id="history-view" style="display:none;">
<div class="section">
<div class="section-header">
<div class="section-title">Recent History</div>
<div class="section-actions">
<a id="export-btn" class="btn btn-secondary" href="#" onclick="exportMarkdown(event)">Export .md</a>
<button class="btn btn-secondary btn-danger-text" onclick="clearHistory()">Clear History</button>
</div>
</div>
<div id="history-list" class="request-list"
hx-get="/api/requests?status=history"
hx-trigger="revealed"
hx-swap="innerHTML">
</div>
</div>
</div>
</div>
<script>
var AUTH_TOKEN = "{{AUTH_TOKEN}}";
var ws = null;
var reconnectDelay = 1000;
var maxReconnectDelay = 30000;
var statusDot = document.getElementById('status-dot');
function showTab(tab) {
document.querySelectorAll('.tab').forEach(function(t) { t.classList.remove('active'); });
event.target.classList.add('active');
document.getElementById('queue-view').style.display = tab === 'queue' ? '' : 'none';
document.getElementById('history-view').style.display = tab === 'history' ? '' : 'none';
}
document.body.addEventListener('htmx:configRequest', function(e) {
if (AUTH_TOKEN) {
e.detail.headers['X-Auth-Token'] = AUTH_TOKEN;
}
});
function applyRequestsOpen(isOpen) {
var toggle = document.getElementById('requests-toggle');
var banner = document.getElementById('closed-banner');
toggle.checked = isOpen;
banner.style.display = isOpen ? 'none' : '';
}
function toggleRequests(isOpen) {
var url = '/api/status';
fetch(url, {
method: 'POST',
headers: Object.assign({'Content-Type': 'application/json'},
AUTH_TOKEN ? {'X-Auth-Token': AUTH_TOKEN} : {}),
body: JSON.stringify({ open: isOpen })
});
applyRequestsOpen(isOpen);
}
(function fetchInitialStatus() {
var url = '/api/status';
if (AUTH_TOKEN) url += '?token=' + encodeURIComponent(AUTH_TOKEN);
fetch(url).then(function(r) { return r.json(); }).then(function(data) {
applyRequestsOpen(data.open);
}).catch(function() {});
})();
function exportMarkdown(e) {
e.preventDefault();
e.stopPropagation();
var url = '/api/export/markdown';
if (AUTH_TOKEN) url += '?token=' + encodeURIComponent(AUTH_TOKEN);
window.location.href = url;
}
function clearHistory() {
if (!confirm('Clear all played and rejected requests from history?')) return;
var url = '/api/history/clear';
if (AUTH_TOKEN) url += '?token=' + encodeURIComponent(AUTH_TOKEN);
fetch(url, { method: 'POST', headers: AUTH_TOKEN ? { 'X-Auth-Token': AUTH_TOKEN } : {} })
.then(function() {
var hl = document.getElementById('history-list');
if (hl) hl.innerHTML = '';
});
}
function connectWS() {
var proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
var url = proto + '//' + location.host + '/ws';
if (AUTH_TOKEN) url += '?token=' + encodeURIComponent(AUTH_TOKEN);
ws = new WebSocket(url);
ws.onopen = function() {
reconnectDelay = 1000;
statusDot.className = 'status-dot connected';
statusDot.title = 'Connected';
};
ws.onclose = function() {
statusDot.className = 'status-dot disconnected';
statusDot.title = 'Disconnected — reconnecting...';
setTimeout(connectWS, reconnectDelay);
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
};
ws.onerror = function() {
ws.close();
};
ws.onmessage = function(e) {
var msg;
try { msg = JSON.parse(e.data); } catch (_) { return; }
if (msg.event === 'history-cleared') {
var hl = document.getElementById('history-list');
if (hl) hl.innerHTML = '';
return;
}
if (msg.event === 'requests-status') {
applyRequestsOpen(msg.open);
return;
}
var cardHtml = msg.html;
if (!cardHtml) return;
var temp = document.createElement('div');
temp.innerHTML = cardHtml.trim();
var newCard = temp.firstElementChild;
if (!newCard) return;
var status = newCard.getAttribute('data-status');
var sectionMap = {
'pending': 'pending-list',
'approved': 'approved-list',
'played': 'history-list',
'rejected': 'history-list'
};
var targetId = sectionMap[status] || 'pending-list';
if (msg.event === 'request-new') {
var list = document.getElementById(targetId);
if (list) {
list.insertBefore(newCard, list.firstChild);
htmx.process(newCard);
}
} else if (msg.event === 'request-update') {
var existing = document.getElementById(newCard.id);
if (existing) existing.remove();
var dest = document.getElementById(targetId);
if (dest) {
dest.insertBefore(newCard, dest.firstChild);
htmx.process(newCard);
}
}
};
}
connectWS();
</script>
</body>
</html>

250
SongRequest/test.py Normal file
View File

@@ -0,0 +1,250 @@
import os
import tempfile
import time
import unittest
from unittest.mock import patch, MagicMock
from .store import RequestStore, SongRequest as SongRequestModel, STATUS_PENDING, STATUS_APPROVED, STATUS_PLAYED
from .itunes import Track, search
from .web import render_request_card, WebServer
# ---------------------------------------------------------------
# Store tests
# ---------------------------------------------------------------
class TestRequestStore(unittest.TestCase):
def setUp(self):
self._tmpdir = tempfile.mkdtemp()
self.db_path = os.path.join(self._tmpdir, "test.db")
self.store = RequestStore(self.db_path)
def _make_request(self, **overrides):
defaults = dict(
id=None,
itunes_track_id=123456,
title="Bohemian Rhapsody",
artist="Queen",
album="A Night at the Opera",
artwork_url="https://example.com/art.jpg",
apple_music_url="https://music.apple.com/track/123",
requester_nick="testuser",
requester_host="testuser!user@host",
channel="#music",
network="freenode",
status=STATUS_PENDING,
created_at=0,
updated_at=0,
)
defaults.update(overrides)
return SongRequestModel(**defaults)
def test_add_and_get(self):
req = self._make_request()
saved = self.store.add(req)
self.assertIsNotNone(saved.id)
self.assertGreater(saved.created_at, 0)
fetched = self.store.get(saved.id)
self.assertEqual(fetched.title, "Bohemian Rhapsody")
self.assertEqual(fetched.status, STATUS_PENDING)
def test_update_status(self):
req = self.store.add(self._make_request())
updated = self.store.update_status(req.id, STATUS_APPROVED)
self.assertEqual(updated.status, STATUS_APPROVED)
self.assertGreater(updated.updated_at, req.created_at)
def test_get_by_status(self):
self.store.add(self._make_request(title="Song A"))
self.store.add(self._make_request(title="Song B"))
req_c = self.store.add(self._make_request(title="Song C"))
self.store.update_status(req_c.id, STATUS_APPROVED)
pending = self.store.get_pending()
self.assertEqual(len(pending), 2)
approved = self.store.get_approved()
self.assertEqual(len(approved), 1)
self.assertEqual(approved[0].title, "Song C")
def test_get_active(self):
self.store.add(self._make_request(title="Song A"))
req_b = self.store.add(self._make_request(title="Song B"))
self.store.update_status(req_b.id, STATUS_APPROVED)
req_c = self.store.add(self._make_request(title="Song C"))
self.store.update_status(req_c.id, STATUS_PLAYED)
active = self.store.get_active()
self.assertEqual(len(active), 2)
titles = {r.title for r in active}
self.assertEqual(titles, {"Song A", "Song B"})
def test_get_history(self):
for i in range(5):
self.store.add(self._make_request(title=f"Song {i}"))
history = self.store.get_history(limit=3)
self.assertEqual(len(history), 3)
def test_delete(self):
req = self.store.add(self._make_request())
self.assertTrue(self.store.delete(req.id))
self.assertIsNone(self.store.get(req.id))
def test_invalid_status_raises(self):
req = self.store.add(self._make_request())
with self.assertRaises(ValueError):
self.store.update_status(req.id, "invalid_status")
# ---------------------------------------------------------------
# iTunes client tests (mocked HTTP)
# ---------------------------------------------------------------
class TestiTunesClient(unittest.TestCase):
SAMPLE_RESPONSE = {
"resultCount": 2,
"results": [
{
"wrapperType": "track",
"kind": "song",
"trackId": 1440935467,
"trackName": "Bohemian Rhapsody",
"artistName": "Queen",
"collectionName": "A Night at the Opera",
"artworkUrl100": "https://is1-ssl.mzstatic.com/image/100x100.jpg",
"trackViewUrl": "https://music.apple.com/us/album/bohemian-rhapsody/1440935467",
"previewUrl": "https://audio-ssl.itunes.apple.com/preview.m4a",
},
{
"wrapperType": "track",
"kind": "song",
"trackId": 1440935468,
"trackName": "Bohemian Rhapsody (Live)",
"artistName": "Queen",
"collectionName": "Live at Wembley",
"artworkUrl100": "https://is1-ssl.mzstatic.com/image/100x100b.jpg",
"trackViewUrl": "https://music.apple.com/us/album/bohemian-rhapsody-live/1440935468",
},
],
}
@patch("SongRequest.itunes.urllib.request.urlopen")
def test_search_returns_tracks(self, mock_urlopen):
import json
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(self.SAMPLE_RESPONSE).encode()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_urlopen.return_value = mock_resp
tracks = search("Bohemian Rhapsody Queen", limit=5)
self.assertEqual(len(tracks), 2)
self.assertEqual(tracks[0].title, "Bohemian Rhapsody")
self.assertEqual(tracks[0].artist, "Queen")
self.assertIn("600x600", tracks[0].artwork_url)
@patch("SongRequest.itunes.urllib.request.urlopen")
def test_search_handles_error(self, mock_urlopen):
mock_urlopen.side_effect = Exception("network error")
tracks = search("test query")
self.assertEqual(tracks, [])
# ---------------------------------------------------------------
# Track dataclass tests
# ---------------------------------------------------------------
class TestTrack(unittest.TestCase):
def test_display(self):
t = Track(
track_id=1, title="Song", artist="Artist",
album="Album", artwork_url="", apple_music_url="",
)
self.assertEqual(t.display(), "Song - Artist (Album)")
def test_from_itunes_artwork_upscale(self):
item = {
"trackId": 1,
"trackName": "Test",
"artistName": "Test Artist",
"collectionName": "Test Album",
"artworkUrl100": "https://example.com/100x100bb.jpg",
"trackViewUrl": "https://music.apple.com/test",
}
t = Track.from_itunes(item)
self.assertIn("600x600", t.artwork_url)
# ---------------------------------------------------------------
# WebServer lifecycle test
# ---------------------------------------------------------------
class TestWebServerLifecycle(unittest.TestCase):
def test_start_and_stop(self):
tmpdir = tempfile.mkdtemp()
db_path = os.path.join(tmpdir, "test.db")
store = RequestStore(db_path)
server = WebServer(
host="127.0.0.1",
port=0, # let OS pick a free port
store=store,
get_auth_token=lambda: "",
)
# Just verify the object can be created and start/stop don't crash
# We don't actually bind port 0 through aiohttp easily, so test
# the thread lifecycle with a known port
server._host = "127.0.0.1"
server._port = 18765
server.start()
time.sleep(0.5)
self.assertTrue(server._thread.is_alive())
server.stop()
time.sleep(0.5)
self.assertFalse(server._thread.is_alive())
# ---------------------------------------------------------------
# Render tests
# ---------------------------------------------------------------
class TestRenderRequestCard(unittest.TestCase):
def test_pending_has_approve_reject(self):
req = SongRequestModel(
id=1, itunes_track_id=1, title="Test Song", artist="Test Artist",
album="Test Album", artwork_url="https://example.com/art.jpg",
apple_music_url="https://music.apple.com/test",
requester_nick="user1", requester_host="", channel="#test",
network="net", status="pending", created_at=time.time(), updated_at=time.time(),
)
result = render_request_card(req)
self.assertIn("btn-approve", result)
self.assertIn("btn-reject", result)
self.assertNotIn("btn-played", result)
def test_approved_has_played_button(self):
req = SongRequestModel(
id=2, itunes_track_id=1, title="Test", artist="Artist",
album="Album", artwork_url="", apple_music_url="",
requester_nick="user2", requester_host="", channel="#test",
network="net", status="approved", created_at=time.time(), updated_at=time.time(),
)
result = render_request_card(req)
self.assertIn("btn-played", result)
self.assertNotIn("btn-approve", result)
def test_played_has_no_actions(self):
req = SongRequestModel(
id=3, itunes_track_id=1, title="Test", artist="Artist",
album="Album", artwork_url="", apple_music_url="",
requester_nick="user3", requester_host="", channel="#test",
network="net", status="played", created_at=time.time(), updated_at=time.time(),
)
result = render_request_card(req)
self.assertNotIn("card-actions", result)
def test_no_songrequest_prefix_in_urls(self):
req = SongRequestModel(
id=4, itunes_track_id=1, title="Test", artist="Artist",
album="Album", artwork_url="", apple_music_url="",
requester_nick="user4", requester_host="", channel="#test",
network="net", status="pending", created_at=time.time(), updated_at=time.time(),
)
result = render_request_card(req)
self.assertIn('hx-post="/api/requests/4/approve"', result)
self.assertNotIn("/songrequest/", result)

373
SongRequest/web.py Normal file
View File

@@ -0,0 +1,373 @@
import asyncio
import datetime
import html
import json
import os
import threading
import time
import weakref
from typing import Optional, Callable
import aiohttp
from aiohttp import web
import supybot.log as log
from .store import SongRequest as SongRequestModel, VALID_STATUSES
TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), "templates")
STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
def _render_alternates(request_id: int, alternates_json: str, status: str) -> str:
"""Render alternate tracks as a collapsible details section."""
if not alternates_json:
return ""
try:
alts = json.loads(alternates_json)
except (json.JSONDecodeError, TypeError):
return ""
if not alts:
return ""
esc = html.escape
show_approve = status == "pending"
items = []
for idx, alt in enumerate(alts):
artwork = esc(alt.get("artwork_url", "").replace("600x600", "100x100"))
title = esc(alt.get("title", ""))
artist = esc(alt.get("artist", ""))
url = esc(alt.get("apple_music_url", ""))
approve_btn = ""
if show_approve:
approve_btn = (
f'<button hx-post="/api/requests/{request_id}/approve-alt/{idx}"'
f' hx-swap="outerHTML" hx-target="#request-{request_id}"'
f' class="btn btn-approve btn-sm"'
f' onclick="event.stopPropagation();">Approve</button>'
)
items.append(
f'<div class="alt-card" onclick="event.stopPropagation(); window.open(\'{url}\',\'_blank\')">'
f'<img class="alt-art" src="{artwork}" alt="" loading="lazy" />'
f'<div class="alt-info"><span class="alt-title">{title}</span>'
f'<span class="alt-artist">{artist}</span></div>'
f'{approve_btn}</div>'
)
return (
'<details class="alternates" onclick="event.stopPropagation();">'
f'<summary>{len(items)} other match{"es" if len(items) != 1 else ""}</summary>'
'<div class="alt-list">' + "".join(items) + '</div></details>'
)
def render_request_card(req: SongRequestModel) -> str:
"""Render a single request as an HTML card fragment."""
status_classes = {
"pending": "status-pending",
"approved": "status-approved",
"rejected": "status-rejected",
"played": "status-played",
}
status_cls = status_classes.get(req.status, "")
esc = html.escape
created = time.strftime("%H:%M:%S", time.localtime(req.created_at))
actions = ""
if req.status == "pending":
actions = f"""
<div class="card-actions">
<button hx-post="/api/requests/{req.id}/approve"
hx-swap="outerHTML" hx-target="#request-{req.id}"
class="btn btn-approve" onclick="event.stopPropagation()">Approve</button>
<button hx-post="/api/requests/{req.id}/reject"
hx-swap="outerHTML" hx-target="#request-{req.id}"
class="btn btn-reject" onclick="event.stopPropagation()">Reject</button>
</div>"""
elif req.status == "approved":
actions = f"""
<div class="card-actions">
<button hx-post="/api/requests/{req.id}/played"
hx-swap="outerHTML" hx-target="#request-{req.id}"
class="btn btn-played" onclick="event.stopPropagation()">Mark Played</button>
</div>"""
alternates_html = _render_alternates(req.id, req.alternates_json, req.status)
return f"""<div id="request-{req.id}" class="card-wrapper" data-status="{esc(req.status)}">
<div class="card-link request-card {esc(status_cls)}"
onclick="window.open('{esc(req.apple_music_url)}','_blank')">
<img class="album-art" src="{esc(req.artwork_url)}" alt="Album art" loading="lazy" />
<div class="card-body">
<div class="card-title">{esc(req.title)}</div>
<div class="card-artist">{esc(req.artist)}</div>
<div class="card-album">{esc(req.album)}</div>
<div class="card-meta">Requested by {esc(req.requester_nick)} in {esc(req.channel)} at {created}</div>
<span class="card-status-badge">{esc(req.status)}</span>
{actions}
</div>
</div>
{alternates_html}
</div>"""
class WebServer:
"""Standalone aiohttp web server running in a background thread."""
def __init__(self, host: str, port: int, store, get_auth_token: Callable[[], str],
on_status_change: Optional[Callable] = None,
get_requests_open: Optional[Callable] = None,
set_requests_open: Optional[Callable] = None):
self._host = host
self._port = port
self._store = store
self._get_auth_token = get_auth_token
self._on_status_change = on_status_change
self._get_requests_open = get_requests_open or (lambda: True)
self._set_requests_open = set_requests_open or (lambda v: None)
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
self._runner: Optional[web.AppRunner] = None
self._ws_clients: weakref.WeakSet = weakref.WeakSet()
def start(self):
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
if self._loop and self._runner:
future = asyncio.run_coroutine_threadsafe(self._shutdown(), self._loop)
try:
future.result(timeout=5)
except Exception:
pass
if self._loop:
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread:
self._thread.join(timeout=5)
def publish(self, event_type: str, data: str):
"""Thread-safe publish from Limnoria threads into the async WebSocket broadcast."""
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(
asyncio.ensure_future,
self._broadcast(event_type, data),
)
def _run(self):
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._start_app())
self._loop.run_forever()
async def _start_app(self):
app = web.Application()
app.router.add_get("/", self._handle_dashboard)
app.router.add_get("/ws", self._handle_ws)
app.router.add_get("/api/requests", self._handle_api_get)
app.router.add_post("/api/requests/{request_id}/approve-alt/{alt_idx}", self._handle_approve_alt)
app.router.add_post("/api/requests/{request_id}/{action}", self._handle_api_action)
app.router.add_get("/api/export/markdown", self._handle_export_markdown)
app.router.add_get("/api/status", self._handle_get_status)
app.router.add_post("/api/status", self._handle_post_status)
app.router.add_post("/api/history/clear", self._handle_clear_history)
app.router.add_static("/static", STATIC_DIR)
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._host, self._port)
await site.start()
log.info(f"SongRequest: Web dashboard listening on {self._host}:{self._port}")
async def _shutdown(self):
for ws in set(self._ws_clients):
await ws.close()
if self._runner:
await self._runner.cleanup()
async def _broadcast(self, event_type: str, data: str):
msg = json.dumps({"event": event_type, "html": data})
dead = []
for ws in set(self._ws_clients):
try:
await ws.send_str(msg)
except (ConnectionResetError, Exception):
dead.append(ws)
for ws in dead:
self._ws_clients.discard(ws)
def _check_auth(self, request: web.Request) -> bool:
token = self._get_auth_token()
if not token:
return True
q_token = request.query.get("token", "")
h_token = request.headers.get("X-Auth-Token", "")
return q_token == token or h_token == token
async def _handle_dashboard(self, request: web.Request) -> web.Response:
template_path = os.path.join(TEMPLATES_DIR, "index.html")
try:
with open(template_path, "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
return web.Response(text="Dashboard template not found", status=500)
token = self._get_auth_token()
content = content.replace("{{AUTH_TOKEN}}", html.escape(token or ""))
return web.Response(text=content, content_type="text/html")
async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
self._ws_clients.add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.ERROR:
break
finally:
self._ws_clients.discard(ws)
return ws
async def _handle_api_get(self, request: web.Request) -> web.Response:
status_filter = request.query.get("status")
if status_filter == "history":
reqs = self._store.get_by_status("played", "rejected")
elif status_filter and status_filter in VALID_STATUSES:
reqs = self._store.get_by_status(status_filter)
else:
reqs = self._store.get_active()
cards = "\n".join(render_request_card(r) for r in reqs)
return web.Response(text=cards, content_type="text/html")
async def _handle_api_action(self, request: web.Request) -> web.Response:
if not self._check_auth(request):
return web.Response(text="Forbidden", status=403)
try:
request_id = int(request.match_info["request_id"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
action = request.match_info.get("action", "")
status_map = {"approve": "approved", "reject": "rejected", "played": "played"}
new_status = status_map.get(action)
if not new_status:
return web.Response(text="Invalid action", status=400)
req = self._store.update_status(request_id, new_status)
if not req:
return web.Response(text="Request not found", status=404)
card_html = render_request_card(req)
await self._broadcast("request-update", card_html)
if self._on_status_change:
try:
self._on_status_change(req)
except Exception:
log.exception("SongRequest: on_status_change callback failed")
return web.Response(text=card_html, content_type="text/html")
async def _handle_approve_alt(self, request: web.Request) -> web.Response:
"""Approve an alternate track: swap it into the main request and approve."""
if not self._check_auth(request):
return web.Response(text="Forbidden", status=403)
try:
request_id = int(request.match_info["request_id"])
alt_idx = int(request.match_info["alt_idx"])
except (ValueError, KeyError):
return web.Response(text="Bad request", status=400)
req = self._store.get(request_id)
if not req:
return web.Response(text="Request not found", status=404)
try:
alts = json.loads(req.alternates_json) if req.alternates_json else []
except (json.JSONDecodeError, TypeError):
alts = []
if alt_idx < 0 or alt_idx >= len(alts):
return web.Response(text="Invalid alternate index", status=400)
alt = alts[alt_idx]
self._store.swap_alternate(request_id, alt)
req = self._store.update_status(request_id, "approved")
if not req:
return web.Response(text="Request not found", status=404)
card_html = render_request_card(req)
await self._broadcast("request-update", card_html)
if self._on_status_change:
try:
self._on_status_change(req)
except Exception:
log.exception("SongRequest: on_status_change callback failed")
return web.Response(text=card_html, content_type="text/html")
async def _handle_export_markdown(self, request: web.Request) -> web.Response:
if not self._check_auth(request):
return web.Response(text="Forbidden", status=403)
reqs = self._store.get_history(limit=5000)
today = datetime.date.today().isoformat()
lines = [f"# Song Requests Export ({today})", "", "| Title | Artist | Album | Requested By | Status | Time | Apple Music |",
"| --- | --- | --- | --- | --- | --- | --- |"]
for r in reqs:
ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(r.created_at))
lines.append(f"| {r.title} | {r.artist} | {r.album} | {r.requester_nick} | {r.status} | {ts} | [link]({r.apple_music_url}) |")
body = "\n".join(lines) + "\n"
return web.Response(
body=body,
content_type="text/markdown",
headers={"Content-Disposition": f'attachment; filename="song-requests-{today}.md"'},
)
async def _handle_get_status(self, request: web.Request) -> web.Response:
is_open = self._get_requests_open()
return web.json_response({"open": is_open})
async def _handle_post_status(self, request: web.Request) -> web.Response:
if not self._check_auth(request):
return web.Response(text="Forbidden", status=403)
try:
data = await request.json()
except Exception:
return web.Response(text="Bad request", status=400)
is_open = bool(data.get("open", True))
self._set_requests_open(is_open)
payload = json.dumps({"event": "requests-status", "open": is_open})
dead = []
for ws_client in set(self._ws_clients):
try:
await ws_client.send_str(payload)
except Exception:
dead.append(ws_client)
for ws_client in dead:
self._ws_clients.discard(ws_client)
return web.json_response({"open": is_open})
async def _handle_clear_history(self, request: web.Request) -> web.Response:
if not self._check_auth(request):
return web.Response(text="Forbidden", status=403)
count = self._store.clear_history()
payload = json.dumps({"event": "history-cleared"})
dead = []
for ws_client in set(self._ws_clients):
try:
await ws_client.send_str(payload)
except Exception:
dead.append(ws_client)
for ws_client in dead:
self._ws_clients.discard(ws_client)
return web.json_response({"cleared": count})