118 lines
3.5 KiB
Markdown
118 lines
3.5 KiB
Markdown
|
|
# Message Queue Fix: Handling Early Messages
|
||
|
|
|
||
|
|
## The Problem
|
||
|
|
|
||
|
|
Messages were being dropped with the warning:
|
||
|
|
```
|
||
|
|
⚠️ Remote channel full, dropping message (this shouldn't happen)
|
||
|
|
```
|
||
|
|
|
||
|
|
But the Remote channel wasn't actually "full" - it **wasn't ready yet**.
|
||
|
|
|
||
|
|
### Root Cause: Timing Issue
|
||
|
|
|
||
|
|
Looking at the logs:
|
||
|
|
```
|
||
|
|
17:09:02 🎧 [KOSMI WEBSOCKET] Message listener started
|
||
|
|
17:09:09 📨 Received message from Kosmi
|
||
|
|
17:09:09 ⚠️ Remote channel full, dropping message
|
||
|
|
17:09:17 Connection succeeded (IRC) ← 15 seconds later!
|
||
|
|
```
|
||
|
|
|
||
|
|
**The problem**: Kosmi connects and starts receiving messages BEFORE other bridges (like IRC) are connected and the Matterbridge router is fully set up. The `b.Remote` channel exists but isn't being read yet, so our non-blocking send fails.
|
||
|
|
|
||
|
|
## The Solution
|
||
|
|
|
||
|
|
Added a **message queue** to buffer messages that arrive before the Remote channel is ready.
|
||
|
|
|
||
|
|
### How It Works
|
||
|
|
|
||
|
|
1. **Try to send immediately** using non-blocking `select`
|
||
|
|
2. **If channel not ready**: Queue the message in memory
|
||
|
|
3. **On next successful send**: Flush all queued messages
|
||
|
|
4. **Also try flushing in background** goroutine
|
||
|
|
|
||
|
|
### Implementation
|
||
|
|
|
||
|
|
```go
|
||
|
|
type Bkosmi struct {
|
||
|
|
// ... existing fields ...
|
||
|
|
messageQueue []config.Message // Buffer for early messages
|
||
|
|
queueMutex sync.Mutex // Protect the queue
|
||
|
|
}
|
||
|
|
|
||
|
|
func (b *Bkosmi) handleIncomingMessage(payload *NewMessagePayload) {
|
||
|
|
// ... create rmsg ...
|
||
|
|
|
||
|
|
// Try to send to Remote channel
|
||
|
|
select {
|
||
|
|
case b.Remote <- rmsg:
|
||
|
|
// Success! Also flush any queued messages
|
||
|
|
b.flushMessageQueue()
|
||
|
|
default:
|
||
|
|
// Channel not ready - queue the message
|
||
|
|
b.queueMutex.Lock()
|
||
|
|
b.messageQueue = append(b.messageQueue, rmsg)
|
||
|
|
b.queueMutex.Unlock()
|
||
|
|
|
||
|
|
// Try to flush in background
|
||
|
|
go b.flushMessageQueue()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (b *Bkosmi) flushMessageQueue() {
|
||
|
|
// Try to send all queued messages
|
||
|
|
// Stop if channel becomes full again
|
||
|
|
// Keep remaining messages in queue
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
## Why This Works
|
||
|
|
|
||
|
|
1. **No messages are lost** - They're buffered in memory until the router is ready
|
||
|
|
2. **Non-blocking** - Uses goroutines and non-blocking selects throughout
|
||
|
|
3. **Automatic retry** - Every successful send triggers a flush attempt
|
||
|
|
4. **Thread-safe** - Uses mutex to protect the queue
|
||
|
|
|
||
|
|
## Expected Behavior After Fix
|
||
|
|
|
||
|
|
### Before
|
||
|
|
```
|
||
|
|
17:09:09 Received message from Kosmi: lo
|
||
|
|
17:09:09 ⚠️ Remote channel full, dropping message
|
||
|
|
17:09:24 Received message from Kosmi: lo
|
||
|
|
17:09:24 ⚠️ Remote channel full, dropping message
|
||
|
|
```
|
||
|
|
|
||
|
|
### After
|
||
|
|
```
|
||
|
|
17:09:09 Received message from Kosmi: lo
|
||
|
|
17:09:09 📦 Remote channel not ready, queued message (1 in queue)
|
||
|
|
17:09:24 Received message from Kosmi: lo
|
||
|
|
17:09:24 📦 Remote channel not ready, queued message (2 in queue)
|
||
|
|
17:09:30 ✅ Message forwarded to Matterbridge
|
||
|
|
17:09:30 📤 Attempting to flush 2 queued messages
|
||
|
|
17:09:30 ✅ Flushed all 2 queued messages
|
||
|
|
```
|
||
|
|
|
||
|
|
## Why This Happened
|
||
|
|
|
||
|
|
When we added authentication and made other changes, we didn't change the connection timing, but the combination of:
|
||
|
|
- Async message handling (fixed earlier)
|
||
|
|
- Non-blocking channel sends (fixed earlier)
|
||
|
|
- Early message arrival (fixed now)
|
||
|
|
|
||
|
|
All exposed this timing issue that was always there but masked by synchronous blocking behavior.
|
||
|
|
|
||
|
|
## Testing
|
||
|
|
|
||
|
|
```bash
|
||
|
|
go build
|
||
|
|
docker-compose build
|
||
|
|
docker-compose up -d
|
||
|
|
docker-compose logs -f matterbridge
|
||
|
|
```
|
||
|
|
|
||
|
|
Send messages in Kosmi immediately after bot connects. They should all be queued and then flushed once the router is ready, with no messages dropped.
|
||
|
|
|