46 lines
1.1 KiB
Python
46 lines
1.1 KiB
Python
|
|
import pytest
|
||
|
|
from ntr_fetcher.websocket import AnnounceManager
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def manager():
|
||
|
|
return AnnounceManager()
|
||
|
|
|
||
|
|
|
||
|
|
def test_no_subscribers_initially(manager):
|
||
|
|
assert manager.subscriber_count == 0
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_subscribe_and_broadcast(manager):
|
||
|
|
received = []
|
||
|
|
|
||
|
|
class FakeWS:
|
||
|
|
async def send_json(self, data):
|
||
|
|
received.append(data)
|
||
|
|
|
||
|
|
ws = FakeWS()
|
||
|
|
manager.add_subscriber(ws)
|
||
|
|
assert manager.subscriber_count == 1
|
||
|
|
|
||
|
|
await manager.broadcast({"type": "announce", "message": "Now Playing: Song #1"})
|
||
|
|
assert len(received) == 1
|
||
|
|
assert received[0]["message"] == "Now Playing: Song #1"
|
||
|
|
|
||
|
|
manager.remove_subscriber(ws)
|
||
|
|
assert manager.subscriber_count == 0
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_broadcast_skips_dead_connections(manager):
|
||
|
|
class DeadWS:
|
||
|
|
async def send_json(self, data):
|
||
|
|
raise Exception("connection closed")
|
||
|
|
|
||
|
|
ws = DeadWS()
|
||
|
|
manager.add_subscriber(ws)
|
||
|
|
assert manager.subscriber_count == 1
|
||
|
|
|
||
|
|
await manager.broadcast({"type": "announce", "message": "test"})
|
||
|
|
assert manager.subscriber_count == 0
|