import pytest from ntr_fetcher.websocket import AnnounceManager, PublicManager @pytest.fixture def manager(): return AnnounceManager() def test_no_bots_initially(manager): assert manager.bot_count == 0 @pytest.mark.asyncio async def test_bot_subscribe_and_broadcast(manager): received = [] class FakeWS: async def send_json(self, data): received.append(data) ws = FakeWS() manager.add_client(ws, role="bot", client_id="test-bot", remote_addr="127.0.0.1") assert manager.bot_count == 1 await manager.broadcast({"type": "announce", "message": "Now Playing: Song #1"}) assert len(received) == 1 assert received[0]["message"] == "Now Playing: Song #1" manager.remove_client(ws) assert manager.bot_count == 0 @pytest.mark.asyncio async def test_viewer_not_counted_as_bot(manager): received = [] class FakeWS: async def send_json(self, data): received.append(data) ws = FakeWS() manager.add_client(ws, role="viewer") assert manager.bot_count == 0 await manager.broadcast({"type": "announce", "message": "test"}) assert len(received) == 1 manager.remove_client(ws) assert manager.bot_count == 0 @pytest.mark.asyncio async def test_broadcast_skips_dead_connections(manager): class DeadWS: async def send_json(self, data): raise Exception("connection closed") ws = DeadWS() manager.add_client(ws, role="bot", client_id="dead-bot") assert manager.bot_count == 1 await manager.broadcast({"type": "announce", "message": "test"}) assert manager.bot_count == 0 @pytest.mark.asyncio async def test_bot_clients_returns_metadata(manager): class FakeWS: async def send_json(self, data): pass ws = FakeWS() manager.add_client(ws, role="bot", client_id="limnoria-prod", remote_addr="10.0.0.5") clients = manager.bot_clients assert len(clients) == 1 assert clients[0]["client_id"] == "limnoria-prod" assert clients[0]["remote_addr"] == "10.0.0.5" assert "connected_at" in clients[0] @pytest.mark.asyncio async def test_status_broadcast_includes_clients(manager): received = [] class FakeWS: async def send_json(self, data): received.append(data) bot = FakeWS() viewer = FakeWS() manager.add_client(bot, role="bot", client_id="my-bot", remote_addr="1.2.3.4") manager.add_client(viewer, role="viewer") await manager.broadcast_status() for msg in received: assert msg["type"] == "status" assert msg["subscribers"] == 1 assert len(msg["clients"]) == 1 assert msg["clients"][0]["client_id"] == "my-bot" # --- PublicManager tests --- @pytest.fixture def public_manager(): return PublicManager() def test_public_no_clients_initially(public_manager): assert public_manager.client_count == 0 @pytest.mark.asyncio async def test_public_add_remove_client(public_manager): class FakeWS: async def send_json(self, data): pass ws = FakeWS() public_manager.add_client(ws) assert public_manager.client_count == 1 public_manager.remove_client(ws) assert public_manager.client_count == 0 @pytest.mark.asyncio async def test_public_broadcast(public_manager): received = [] class FakeWS: async def send_json(self, data): received.append(data) ws1 = FakeWS() ws2 = FakeWS() public_manager.add_client(ws1) public_manager.add_client(ws2) await public_manager.broadcast({"type": "reveal", "position": 1}) assert len(received) == 2 assert all(m["type"] == "reveal" for m in received) @pytest.mark.asyncio async def test_public_broadcast_removes_dead_clients(public_manager): class DeadWS: async def send_json(self, data): raise Exception("closed") ws = DeadWS() public_manager.add_client(ws) assert public_manager.client_count == 1 await public_manager.broadcast({"type": "reveal", "position": 1}) assert public_manager.client_count == 0