sync
This commit is contained in:
117
MESSAGE_QUEUE_FIX.md
Normal file
117
MESSAGE_QUEUE_FIX.md
Normal file
@@ -0,0 +1,117 @@
|
||||
# Message Queue Fix: Handling Early Messages
|
||||
|
||||
## The Problem
|
||||
|
||||
Messages were being dropped with the warning:
|
||||
```
|
||||
⚠️ Remote channel full, dropping message (this shouldn't happen)
|
||||
```
|
||||
|
||||
But the Remote channel wasn't actually "full" - it **wasn't ready yet**.
|
||||
|
||||
### Root Cause: Timing Issue
|
||||
|
||||
Looking at the logs:
|
||||
```
|
||||
17:09:02 🎧 [KOSMI WEBSOCKET] Message listener started
|
||||
17:09:09 📨 Received message from Kosmi
|
||||
17:09:09 ⚠️ Remote channel full, dropping message
|
||||
17:09:17 Connection succeeded (IRC) ← 15 seconds later!
|
||||
```
|
||||
|
||||
**The problem**: Kosmi connects and starts receiving messages BEFORE other bridges (like IRC) are connected and the Matterbridge router is fully set up. The `b.Remote` channel exists but isn't being read yet, so our non-blocking send fails.
|
||||
|
||||
## The Solution
|
||||
|
||||
Added a **message queue** to buffer messages that arrive before the Remote channel is ready.
|
||||
|
||||
### How It Works
|
||||
|
||||
1. **Try to send immediately** using non-blocking `select`
|
||||
2. **If channel not ready**: Queue the message in memory
|
||||
3. **On next successful send**: Flush all queued messages
|
||||
4. **Also try flushing in background** goroutine
|
||||
|
||||
### Implementation
|
||||
|
||||
```go
|
||||
type Bkosmi struct {
|
||||
// ... existing fields ...
|
||||
messageQueue []config.Message // Buffer for early messages
|
||||
queueMutex sync.Mutex // Protect the queue
|
||||
}
|
||||
|
||||
func (b *Bkosmi) handleIncomingMessage(payload *NewMessagePayload) {
|
||||
// ... create rmsg ...
|
||||
|
||||
// Try to send to Remote channel
|
||||
select {
|
||||
case b.Remote <- rmsg:
|
||||
// Success! Also flush any queued messages
|
||||
b.flushMessageQueue()
|
||||
default:
|
||||
// Channel not ready - queue the message
|
||||
b.queueMutex.Lock()
|
||||
b.messageQueue = append(b.messageQueue, rmsg)
|
||||
b.queueMutex.Unlock()
|
||||
|
||||
// Try to flush in background
|
||||
go b.flushMessageQueue()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bkosmi) flushMessageQueue() {
|
||||
// Try to send all queued messages
|
||||
// Stop if channel becomes full again
|
||||
// Keep remaining messages in queue
|
||||
}
|
||||
```
|
||||
|
||||
## Why This Works
|
||||
|
||||
1. **No messages are lost** - They're buffered in memory until the router is ready
|
||||
2. **Non-blocking** - Uses goroutines and non-blocking selects throughout
|
||||
3. **Automatic retry** - Every successful send triggers a flush attempt
|
||||
4. **Thread-safe** - Uses mutex to protect the queue
|
||||
|
||||
## Expected Behavior After Fix
|
||||
|
||||
### Before
|
||||
```
|
||||
17:09:09 Received message from Kosmi: lo
|
||||
17:09:09 ⚠️ Remote channel full, dropping message
|
||||
17:09:24 Received message from Kosmi: lo
|
||||
17:09:24 ⚠️ Remote channel full, dropping message
|
||||
```
|
||||
|
||||
### After
|
||||
```
|
||||
17:09:09 Received message from Kosmi: lo
|
||||
17:09:09 📦 Remote channel not ready, queued message (1 in queue)
|
||||
17:09:24 Received message from Kosmi: lo
|
||||
17:09:24 📦 Remote channel not ready, queued message (2 in queue)
|
||||
17:09:30 ✅ Message forwarded to Matterbridge
|
||||
17:09:30 📤 Attempting to flush 2 queued messages
|
||||
17:09:30 ✅ Flushed all 2 queued messages
|
||||
```
|
||||
|
||||
## Why This Happened
|
||||
|
||||
When we added authentication and made other changes, we didn't change the connection timing, but the combination of:
|
||||
- Async message handling (fixed earlier)
|
||||
- Non-blocking channel sends (fixed earlier)
|
||||
- Early message arrival (fixed now)
|
||||
|
||||
All exposed this timing issue that was always there but masked by synchronous blocking behavior.
|
||||
|
||||
## Testing
|
||||
|
||||
```bash
|
||||
go build
|
||||
docker-compose build
|
||||
docker-compose up -d
|
||||
docker-compose logs -f matterbridge
|
||||
```
|
||||
|
||||
Send messages in Kosmi immediately after bot connects. They should all be queued and then flushed once the router is ready, with no messages dropped.
|
||||
|
||||
Reference in New Issue
Block a user