Files
NtR-soudcloud-fetcher/tests/test_websocket.py
cottongin 425a7047c3 feat: add public index page with censored playlist and live reveals
Public-facing page at / shows the current show's playlist with tracks
obscured until the admin marks them as announced. Tracks reveal in
real-time via a new unauthenticated /ws/public WebSocket. Server-side
censorship on /public/playlist strips track details from unannounced
items and sanitizes announced tracks to only expose frontend-needed
fields (no raw_json, track_id, etc). Past episodes are browsable with
fully revealed but sanitized tracklists.

Also fixes RuntimeError on backfill shutdown by closing the httpx
client on the same event loop that created it.

Made-with: Cursor
2026-04-01 23:50:41 -04:00

159 lines
4.0 KiB
Python

import pytest
from ntr_fetcher.websocket import AnnounceManager, PublicManager
@pytest.fixture
def manager():
return AnnounceManager()
def test_no_bots_initially(manager):
assert manager.bot_count == 0
@pytest.mark.asyncio
async def test_bot_subscribe_and_broadcast(manager):
received = []
class FakeWS:
async def send_json(self, data):
received.append(data)
ws = FakeWS()
manager.add_client(ws, role="bot", client_id="test-bot", remote_addr="127.0.0.1")
assert manager.bot_count == 1
await manager.broadcast({"type": "announce", "message": "Now Playing: Song #1"})
assert len(received) == 1
assert received[0]["message"] == "Now Playing: Song #1"
manager.remove_client(ws)
assert manager.bot_count == 0
@pytest.mark.asyncio
async def test_viewer_not_counted_as_bot(manager):
received = []
class FakeWS:
async def send_json(self, data):
received.append(data)
ws = FakeWS()
manager.add_client(ws, role="viewer")
assert manager.bot_count == 0
await manager.broadcast({"type": "announce", "message": "test"})
assert len(received) == 1
manager.remove_client(ws)
assert manager.bot_count == 0
@pytest.mark.asyncio
async def test_broadcast_skips_dead_connections(manager):
class DeadWS:
async def send_json(self, data):
raise Exception("connection closed")
ws = DeadWS()
manager.add_client(ws, role="bot", client_id="dead-bot")
assert manager.bot_count == 1
await manager.broadcast({"type": "announce", "message": "test"})
assert manager.bot_count == 0
@pytest.mark.asyncio
async def test_bot_clients_returns_metadata(manager):
class FakeWS:
async def send_json(self, data):
pass
ws = FakeWS()
manager.add_client(ws, role="bot", client_id="limnoria-prod", remote_addr="10.0.0.5")
clients = manager.bot_clients
assert len(clients) == 1
assert clients[0]["client_id"] == "limnoria-prod"
assert clients[0]["remote_addr"] == "10.0.0.5"
assert "connected_at" in clients[0]
@pytest.mark.asyncio
async def test_status_broadcast_includes_clients(manager):
received = []
class FakeWS:
async def send_json(self, data):
received.append(data)
bot = FakeWS()
viewer = FakeWS()
manager.add_client(bot, role="bot", client_id="my-bot", remote_addr="1.2.3.4")
manager.add_client(viewer, role="viewer")
await manager.broadcast_status()
for msg in received:
assert msg["type"] == "status"
assert msg["subscribers"] == 1
assert len(msg["clients"]) == 1
assert msg["clients"][0]["client_id"] == "my-bot"
# --- PublicManager tests ---
@pytest.fixture
def public_manager():
return PublicManager()
def test_public_no_clients_initially(public_manager):
assert public_manager.client_count == 0
@pytest.mark.asyncio
async def test_public_add_remove_client(public_manager):
class FakeWS:
async def send_json(self, data):
pass
ws = FakeWS()
public_manager.add_client(ws)
assert public_manager.client_count == 1
public_manager.remove_client(ws)
assert public_manager.client_count == 0
@pytest.mark.asyncio
async def test_public_broadcast(public_manager):
received = []
class FakeWS:
async def send_json(self, data):
received.append(data)
ws1 = FakeWS()
ws2 = FakeWS()
public_manager.add_client(ws1)
public_manager.add_client(ws2)
await public_manager.broadcast({"type": "reveal", "position": 1})
assert len(received) == 2
assert all(m["type"] == "reveal" for m in received)
@pytest.mark.asyncio
async def test_public_broadcast_removes_dead_clients(public_manager):
class DeadWS:
async def send_json(self, data):
raise Exception("closed")
ws = DeadWS()
public_manager.add_client(ws)
assert public_manager.client_count == 1
await public_manager.broadcast({"type": "reveal", "position": 1})
assert public_manager.client_count == 0