119 lines
4.4 KiB
Markdown
119 lines
4.4 KiB
Markdown
|
|
# Critical Fix: Asynchronous Message Handling
|
||
|
|
|
||
|
|
## The Problem
|
||
|
|
|
||
|
|
The bot was only receiving ONE message from Kosmi then hanging, even though the WebSocket stayed connected.
|
||
|
|
|
||
|
|
### Root Cause
|
||
|
|
|
||
|
|
We were reading WebSocket responses **synchronously** during connection setup, but the server sends responses in a different order than we were expecting:
|
||
|
|
|
||
|
|
**What we were doing:**
|
||
|
|
1. Send `ExtendedCurrentUserQuery` → Try to read response immediately
|
||
|
|
2. Send `RoomChatQuery` → Try to read response immediately
|
||
|
|
3. Send `JoinRoom` → Try to read response immediately (loop 10 times)
|
||
|
|
|
||
|
|
**What the server actually sends:**
|
||
|
|
1. `complete` for `current-user` (from ExtendedCurrentUserQuery)
|
||
|
|
2. `next` for `join-room` (from JoinRoom)
|
||
|
|
3. `next` for `room-chat-query` (from RoomChatQuery)
|
||
|
|
4. ... more responses ...
|
||
|
|
|
||
|
|
**The mismatch:**
|
||
|
|
- We sent `RoomChatQuery` and tried to read its response
|
||
|
|
- But we actually read the `complete` message from the previous `current-user` query!
|
||
|
|
- This consumed a message that `listenForMessages` should have handled
|
||
|
|
- The message loop got out of sync and stopped processing new messages
|
||
|
|
|
||
|
|
### Evidence from Logs
|
||
|
|
|
||
|
|
```
|
||
|
|
time="2025-11-01T16:36:06-04:00" level=info msg="Chat history response: type=complete id=current-user"
|
||
|
|
```
|
||
|
|
|
||
|
|
We sent a query with ID `room-chat-query` but received a response with ID `current-user` - wrong message!
|
||
|
|
|
||
|
|
## The Fix
|
||
|
|
|
||
|
|
**Stop reading responses synchronously during setup.** Let the `listenForMessages` goroutine handle ALL incoming messages.
|
||
|
|
|
||
|
|
### Changes Made
|
||
|
|
|
||
|
|
1. **Removed synchronous read after `ExtendedCurrentUserQuery`**
|
||
|
|
- Before: Send query → Read response
|
||
|
|
- After: Send query → Continue (let listenForMessages handle it)
|
||
|
|
|
||
|
|
2. **Removed synchronous read after `RoomChatQuery`**
|
||
|
|
- Before: Send query → Read response
|
||
|
|
- After: Send query → Continue (let listenForMessages handle it)
|
||
|
|
|
||
|
|
3. **Removed synchronous loop after `JoinRoom`**
|
||
|
|
- Before: Send mutation → Loop 10 times reading responses
|
||
|
|
- After: Send mutation → Brief sleep → Continue (let listenForMessages handle it)
|
||
|
|
|
||
|
|
4. **Enhanced `listenForMessages` to handle all message types**
|
||
|
|
- Added switch statement for `messageTypeNext`, `messageTypeError`, `messageTypeComplete`
|
||
|
|
- Added specific handling for known operation IDs (`current-user`, `room-chat-query`, `join-room`, `subscribe-messages`)
|
||
|
|
- Added better logging for debugging
|
||
|
|
|
||
|
|
### New Message Flow
|
||
|
|
|
||
|
|
```
|
||
|
|
Connection Setup (synchronous):
|
||
|
|
├─ Send connection_init
|
||
|
|
├─ Wait for connection_ack (ONLY synchronous read)
|
||
|
|
├─ Send ExtendedCurrentUserQuery
|
||
|
|
├─ Send JoinRoom
|
||
|
|
├─ Send RoomChatQuery
|
||
|
|
├─ Send RoomDisconnect subscription
|
||
|
|
├─ Send MemberJoins subscription
|
||
|
|
├─ Send MemberLeaves subscription
|
||
|
|
├─ Send NewMessageSubscription
|
||
|
|
└─ Start listenForMessages goroutine
|
||
|
|
|
||
|
|
listenForMessages (asynchronous):
|
||
|
|
├─ Reads ALL incoming messages
|
||
|
|
├─ Handles responses for all queries/mutations/subscriptions
|
||
|
|
└─ Processes new chat messages
|
||
|
|
```
|
||
|
|
|
||
|
|
## Expected Behavior After Fix
|
||
|
|
|
||
|
|
1. ✅ Bot connects and authenticates
|
||
|
|
2. ✅ Bot sends all setup queries/subscriptions
|
||
|
|
3. ✅ `listenForMessages` handles all responses in order
|
||
|
|
4. ✅ Bot receives ALL messages continuously (not just one)
|
||
|
|
5. ✅ No "ALREADY_CONNECTED" errors
|
||
|
|
6. ✅ WebSocket stays alive and processes messages
|
||
|
|
|
||
|
|
## Testing
|
||
|
|
|
||
|
|
```bash
|
||
|
|
go build
|
||
|
|
docker-compose build
|
||
|
|
docker-compose up -d
|
||
|
|
docker-compose logs -f matterbridge
|
||
|
|
```
|
||
|
|
|
||
|
|
Look for:
|
||
|
|
- `🎧 [KOSMI WEBSOCKET] Message listener started`
|
||
|
|
- `📨 [KOSMI WEBSOCKET] Received: type=next id=current-user`
|
||
|
|
- `✅ Received current user data`
|
||
|
|
- `📨 [KOSMI WEBSOCKET] Received: type=next id=join-room`
|
||
|
|
- `✅ Successfully joined room`
|
||
|
|
- `📨 [KOSMI WEBSOCKET] Received: type=next id=room-chat-query`
|
||
|
|
- `✅ Received chat history`
|
||
|
|
- Multiple `📨 [KOSMI WEBSOCKET] Received: type=next id=subscribe-messages`
|
||
|
|
- Multiple `Received message from Kosmi:` entries
|
||
|
|
|
||
|
|
## Why This Matters
|
||
|
|
|
||
|
|
GraphQL-WS is an **asynchronous protocol**. The server can send responses in any order, and multiple responses can be in flight at once. By trying to read responses synchronously, we were:
|
||
|
|
|
||
|
|
1. **Breaking the message order** - Reading messages meant for the listener
|
||
|
|
2. **Creating race conditions** - Setup code and listener competing for messages
|
||
|
|
3. **Blocking the connection** - Waiting for specific responses that might never come (or come in different order)
|
||
|
|
|
||
|
|
The fix ensures that **only one goroutine** (`listenForMessages`) reads from the WebSocket, eliminating all race conditions and ensuring messages are processed in the order they arrive.
|
||
|
|
|