Public-facing page at / shows the current show's playlist with tracks obscured until the admin marks them as announced. Tracks reveal in real-time via a new unauthenticated /ws/public WebSocket. Server-side censorship on /public/playlist strips track details from unannounced items and sanitizes announced tracks to only expose frontend-needed fields (no raw_json, track_id, etc). Past episodes are browsable with fully revealed but sanitized tracklists. Also fixes RuntimeError on backfill shutdown by closing the httpx client on the same event loop that created it. Made-with: Cursor
159 lines
4.0 KiB
Python
159 lines
4.0 KiB
Python
import pytest
|
|
from ntr_fetcher.websocket import AnnounceManager, PublicManager
|
|
|
|
|
|
@pytest.fixture
|
|
def manager():
|
|
return AnnounceManager()
|
|
|
|
|
|
def test_no_bots_initially(manager):
|
|
assert manager.bot_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bot_subscribe_and_broadcast(manager):
|
|
received = []
|
|
|
|
class FakeWS:
|
|
async def send_json(self, data):
|
|
received.append(data)
|
|
|
|
ws = FakeWS()
|
|
manager.add_client(ws, role="bot", client_id="test-bot", remote_addr="127.0.0.1")
|
|
assert manager.bot_count == 1
|
|
|
|
await manager.broadcast({"type": "announce", "message": "Now Playing: Song #1"})
|
|
assert len(received) == 1
|
|
assert received[0]["message"] == "Now Playing: Song #1"
|
|
|
|
manager.remove_client(ws)
|
|
assert manager.bot_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_viewer_not_counted_as_bot(manager):
|
|
received = []
|
|
|
|
class FakeWS:
|
|
async def send_json(self, data):
|
|
received.append(data)
|
|
|
|
ws = FakeWS()
|
|
manager.add_client(ws, role="viewer")
|
|
assert manager.bot_count == 0
|
|
|
|
await manager.broadcast({"type": "announce", "message": "test"})
|
|
assert len(received) == 1
|
|
|
|
manager.remove_client(ws)
|
|
assert manager.bot_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broadcast_skips_dead_connections(manager):
|
|
class DeadWS:
|
|
async def send_json(self, data):
|
|
raise Exception("connection closed")
|
|
|
|
ws = DeadWS()
|
|
manager.add_client(ws, role="bot", client_id="dead-bot")
|
|
assert manager.bot_count == 1
|
|
|
|
await manager.broadcast({"type": "announce", "message": "test"})
|
|
assert manager.bot_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bot_clients_returns_metadata(manager):
|
|
class FakeWS:
|
|
async def send_json(self, data):
|
|
pass
|
|
|
|
ws = FakeWS()
|
|
manager.add_client(ws, role="bot", client_id="limnoria-prod", remote_addr="10.0.0.5")
|
|
clients = manager.bot_clients
|
|
assert len(clients) == 1
|
|
assert clients[0]["client_id"] == "limnoria-prod"
|
|
assert clients[0]["remote_addr"] == "10.0.0.5"
|
|
assert "connected_at" in clients[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_broadcast_includes_clients(manager):
|
|
received = []
|
|
|
|
class FakeWS:
|
|
async def send_json(self, data):
|
|
received.append(data)
|
|
|
|
bot = FakeWS()
|
|
viewer = FakeWS()
|
|
manager.add_client(bot, role="bot", client_id="my-bot", remote_addr="1.2.3.4")
|
|
manager.add_client(viewer, role="viewer")
|
|
|
|
await manager.broadcast_status()
|
|
for msg in received:
|
|
assert msg["type"] == "status"
|
|
assert msg["subscribers"] == 1
|
|
assert len(msg["clients"]) == 1
|
|
assert msg["clients"][0]["client_id"] == "my-bot"
|
|
|
|
|
|
# --- PublicManager tests ---
|
|
|
|
|
|
@pytest.fixture
|
|
def public_manager():
|
|
return PublicManager()
|
|
|
|
|
|
def test_public_no_clients_initially(public_manager):
|
|
assert public_manager.client_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_add_remove_client(public_manager):
|
|
class FakeWS:
|
|
async def send_json(self, data):
|
|
pass
|
|
|
|
ws = FakeWS()
|
|
public_manager.add_client(ws)
|
|
assert public_manager.client_count == 1
|
|
|
|
public_manager.remove_client(ws)
|
|
assert public_manager.client_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_broadcast(public_manager):
|
|
received = []
|
|
|
|
class FakeWS:
|
|
async def send_json(self, data):
|
|
received.append(data)
|
|
|
|
ws1 = FakeWS()
|
|
ws2 = FakeWS()
|
|
public_manager.add_client(ws1)
|
|
public_manager.add_client(ws2)
|
|
|
|
await public_manager.broadcast({"type": "reveal", "position": 1})
|
|
assert len(received) == 2
|
|
assert all(m["type"] == "reveal" for m in received)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_broadcast_removes_dead_clients(public_manager):
|
|
class DeadWS:
|
|
async def send_json(self, data):
|
|
raise Exception("closed")
|
|
|
|
ws = DeadWS()
|
|
public_manager.add_client(ws)
|
|
assert public_manager.client_count == 1
|
|
|
|
await public_manager.broadcast({"type": "reveal", "position": 1})
|
|
assert public_manager.client_count == 0
|