Add connection resilience and reconnect commands for Kosmi and Jackbox

Kosmi WebSocket would silently die after hours/days with no reconnection.
Jackbox WebSocket failed to reconnect after API server restarts (stale JWT)
and leaked heartbeat goroutines on each reconnect cycle.

Kosmi changes:
- Add WebSocket ping/pong keepalive (30s ping, 90s read deadline)
- Send EventFailure on unexpected disconnect to trigger gateway reconnectBridge()
- Add intentionalDisconnect flag to prevent false failure events on clean shutdown
- Fix Disconnect() to be safe for reconnect cycles

Jackbox changes:
- Add read deadline (90s) to detect stale connections
- Fix heartbeat goroutine leak via per-connection listenDone channel
- Re-authenticate for fresh JWT before each reconnect attempt
- Add Manager.Reconnect() for on-demand teardown and rebuild

IRC commands:
- !kreconnect - reconnect Kosmi bridge
- !jreconnect - reconnect Jackbox WebSocket
- !reconnect  - reconnect all services (Kosmi + Jackbox)

Made-with: Cursor
This commit is contained in:
cottongin
2026-04-05 05:30:39 -04:00
parent bec3615d2b
commit 4fc7f08b24
6 changed files with 182 additions and 32 deletions

View File

@@ -29,6 +29,8 @@ const (
EventGetChannelMembers = "get_channel_members" EventGetChannelMembers = "get_channel_members"
EventNoticeIRC = "notice_irc" EventNoticeIRC = "notice_irc"
EventReconnectKosmi = "reconnect_kosmi" EventReconnectKosmi = "reconnect_kosmi"
EventReconnectJackbox = "reconnect_jackbox"
EventReconnectAll = "reconnect_all"
EventVotesQuery = "votes_query" EventVotesQuery = "votes_query"
) )

View File

@@ -268,8 +268,10 @@ func (b *Birc) handlePrivMsg(client *girc.Client, event girc.Event) {
} }
} }
// Handle !kreconnect command: trigger Kosmi bridge reconnection // Handle reconnect commands
if strings.TrimSpace(rmsg.Text) == "!kreconnect" { trimmedText := strings.TrimSpace(rmsg.Text)
switch trimmedText {
case "!kreconnect":
b.Log.Infof("!kreconnect command from %s on %s", event.Source.Name, rmsg.Channel) b.Log.Infof("!kreconnect command from %s on %s", event.Source.Name, rmsg.Channel)
b.Remote <- config.Message{ b.Remote <- config.Message{
Username: "system", Username: "system",
@@ -279,6 +281,26 @@ func (b *Birc) handlePrivMsg(client *girc.Client, event girc.Event) {
Event: config.EventReconnectKosmi, Event: config.EventReconnectKosmi,
} }
return return
case "!jreconnect":
b.Log.Infof("!jreconnect command from %s on %s", event.Source.Name, rmsg.Channel)
b.Remote <- config.Message{
Username: "system",
Text: "jreconnect",
Channel: rmsg.Channel,
Account: b.Account,
Event: config.EventReconnectJackbox,
}
return
case "!reconnect":
b.Log.Infof("!reconnect command from %s on %s", event.Source.Name, rmsg.Channel)
b.Remote <- config.Message{
Username: "system",
Text: "reconnect",
Channel: rmsg.Channel,
Account: b.Account,
Event: config.EventReconnectAll,
}
return
} }
// Handle !votes command: query current game vote tally // Handle !votes command: query current game vote tally

View File

@@ -242,6 +242,37 @@ func (m *Manager) monitorActiveSessions() {
} }
} }
// Reconnect tears down the existing WebSocket client and establishes a new
// connection (re-authenticating for a fresh JWT). Safe to call even when
// already disconnected.
func (m *Manager) Reconnect() error {
if !m.enabled || !m.useWebSocket {
return fmt.Errorf("Jackbox WebSocket is not enabled")
}
m.log.Info("Forcing Jackbox WebSocket reconnection...")
// Tear down existing client
if m.wsClient != nil {
if err := m.wsClient.Close(); err != nil {
m.log.Errorf("Error closing existing WebSocket client: %v", err)
}
m.wsClient = nil
}
// Re-authenticate for a fresh JWT
if err := m.client.Authenticate(); err != nil {
return fmt.Errorf("re-authentication failed: %w", err)
}
// Rebuild the WebSocket client using the original callback
if m.messageCallback == nil {
return fmt.Errorf("no message callback registered")
}
return m.startWebSocketClient(m.messageCallback)
}
// GetClient returns the Jackbox API client (may be nil if disabled) // GetClient returns the Jackbox API client (may be nil if disabled)
func (m *Manager) GetClient() *Client { func (m *Manager) GetClient() *Client {
return m.client return m.client

View File

@@ -10,6 +10,11 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
const (
jackboxPingInterval = 30 * time.Second
jackboxReadTimeout = 90 * time.Second
)
// WebSocketClient handles WebSocket connection to Jackbox API // WebSocketClient handles WebSocket connection to Jackbox API
type WebSocketClient struct { type WebSocketClient struct {
apiURL string apiURL string
@@ -22,6 +27,7 @@ type WebSocketClient struct {
reconnectDelay time.Duration reconnectDelay time.Duration
maxReconnect time.Duration maxReconnect time.Duration
stopChan chan struct{} stopChan chan struct{}
listenDone chan struct{} // closed when the current listen() goroutine exits
connected bool connected bool
authenticated bool authenticated bool
subscribedSession int subscribedSession int
@@ -109,8 +115,11 @@ func (c *WebSocketClient) Connect() error {
c.conn = conn c.conn = conn
c.connected = true c.connected = true
c.listenDone = make(chan struct{})
c.log.Info("WebSocket connected") c.log.Info("WebSocket connected")
conn.SetReadDeadline(time.Now().Add(jackboxReadTimeout))
// Start message listener // Start message listener
go c.listen() go c.listen()
@@ -181,9 +190,9 @@ func (c *WebSocketClient) Unsubscribe(sessionID int) error {
// listen handles incoming WebSocket messages // listen handles incoming WebSocket messages
func (c *WebSocketClient) listen() { func (c *WebSocketClient) listen() {
defer c.handleDisconnect() defer c.handleDisconnect()
defer close(c.listenDone)
// Start heartbeat go c.startHeartbeat(c.listenDone)
go c.startHeartbeat()
for { for {
select { select {
@@ -196,6 +205,9 @@ func (c *WebSocketClient) listen() {
return return
} }
// Reset read deadline on every successful read
c.conn.SetReadDeadline(time.Now().Add(jackboxReadTimeout))
c.handleMessage(message) c.handleMessage(message)
} }
} }
@@ -438,15 +450,18 @@ func (c *WebSocketClient) AnnounceSessionEnd() {
} }
} }
// startHeartbeat sends ping messages periodically // startHeartbeat sends ping messages periodically. It exits when listenDone
func (c *WebSocketClient) startHeartbeat() { // is closed (current connection ended) or stopChan is closed (full shutdown).
ticker := time.NewTicker(30 * time.Second) func (c *WebSocketClient) startHeartbeat(listenDone <-chan struct{}) {
ticker := time.NewTicker(jackboxPingInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-c.stopChan: case <-c.stopChan:
return return
case <-listenDone:
return
case <-ticker.C: case <-ticker.C:
c.mu.Lock() c.mu.Lock()
if c.connected && c.conn != nil { if c.connected && c.conn != nil {
@@ -474,7 +489,9 @@ func (c *WebSocketClient) sendMessage(msg WSMessage) error {
return c.conn.WriteMessage(websocket.TextMessage, data) return c.conn.WriteMessage(websocket.TextMessage, data)
} }
// handleDisconnect handles connection loss and attempts reconnection // handleDisconnect handles connection loss and attempts reconnection with
// exponential backoff. Before each attempt it re-authenticates via the HTTP
// API to obtain a fresh JWT (the old token may be invalid after a server restart).
func (c *WebSocketClient) handleDisconnect() { func (c *WebSocketClient) handleDisconnect() {
c.mu.Lock() c.mu.Lock()
c.connected = false c.connected = false
@@ -487,7 +504,6 @@ func (c *WebSocketClient) handleDisconnect() {
c.log.Warn("WebSocket disconnected, attempting to reconnect...") c.log.Warn("WebSocket disconnected, attempting to reconnect...")
// Exponential backoff reconnection
delay := c.reconnectDelay delay := c.reconnectDelay
for { for {
select { select {
@@ -496,21 +512,27 @@ func (c *WebSocketClient) handleDisconnect() {
case <-time.After(delay): case <-time.After(delay):
c.log.Infof("Reconnecting... (delay: %v)", delay) c.log.Infof("Reconnecting... (delay: %v)", delay)
// Re-authenticate to get a fresh JWT token before reconnecting.
if c.apiClient != nil {
if err := c.apiClient.Authenticate(); err != nil {
c.log.Errorf("Re-authentication failed: %v (will retry)", err)
delay = c.bumpDelay(delay)
continue
}
c.mu.Lock()
c.token = c.apiClient.GetToken()
c.mu.Unlock()
c.log.Info("Re-authenticated with fresh JWT token")
}
if err := c.Connect(); err != nil { if err := c.Connect(); err != nil {
c.log.Errorf("Reconnection failed: %v", err) c.log.Errorf("Reconnection failed: %v", err)
delay = c.bumpDelay(delay)
// Increase delay with exponential backoff
delay *= 2
if delay > c.maxReconnect {
delay = c.maxReconnect
}
continue continue
} }
// Reconnected successfully
c.log.Info("Reconnected successfully") c.log.Info("Reconnected successfully")
// Re-subscribe if we were subscribed before
if c.subscribedSession > 0 { if c.subscribedSession > 0 {
if err := c.Subscribe(c.subscribedSession); err != nil { if err := c.Subscribe(c.subscribedSession); err != nil {
c.log.Errorf("Failed to re-subscribe: %v", err) c.log.Errorf("Failed to re-subscribe: %v", err)
@@ -522,6 +544,14 @@ func (c *WebSocketClient) handleDisconnect() {
} }
} }
func (c *WebSocketClient) bumpDelay(d time.Duration) time.Duration {
d *= 2
if d > c.maxReconnect {
d = c.maxReconnect
}
return d
}
// Close closes the WebSocket connection // Close closes the WebSocket connection
func (c *WebSocketClient) Close() error { func (c *WebSocketClient) Close() error {
c.mu.Lock() c.mu.Lock()

View File

@@ -57,26 +57,12 @@ func (r *Router) handleEventReconnectKosmi(msg *config.Message) bool {
return false return false
} }
originChannel := msg.Channel r.sendConfirmation(msg, "Reconnecting Kosmi...")
originAccount := msg.Account
for _, gw := range r.Gateways { for _, gw := range r.Gateways {
for _, br := range gw.Bridges { for _, br := range gw.Bridges {
if br.Protocol == "kosmi" { if br.Protocol == "kosmi" {
r.logger.Infof("Reconnecting Kosmi bridge %s (requested via !kreconnect)", br.Account) r.logger.Infof("Reconnecting Kosmi bridge %s (requested via !kreconnect)", br.Account)
// Send confirmation to the IRC channel that requested it
if originAccount != "" && originChannel != "" {
if ircBr, ok := gw.Bridges[originAccount]; ok {
ircBr.Send(config.Message{
Text: "Reconnecting Kosmi...",
Channel: originChannel,
Username: "system",
Account: originAccount,
})
}
}
go gw.reconnectBridge(br) go gw.reconnectBridge(br)
return true return true
} }
@@ -87,6 +73,79 @@ func (r *Router) handleEventReconnectKosmi(msg *config.Message) bool {
return true return true
} }
// handleEventReconnectJackbox handles a manual Jackbox reconnect request (e.g. from !jreconnect).
// Returns true if the event was consumed.
func (r *Router) handleEventReconnectJackbox(msg *config.Message) bool {
if msg.Event != config.EventReconnectJackbox {
return false
}
r.sendConfirmation(msg, "Reconnecting Jackbox...")
go func() {
if r.JackboxManager == nil || !r.JackboxManager.IsEnabled() {
r.logger.Warn("!jreconnect: Jackbox integration is not enabled")
return
}
r.logger.Info("Reconnecting Jackbox WebSocket (requested via !jreconnect)")
if err := r.JackboxManager.Reconnect(); err != nil {
r.logger.Errorf("Jackbox reconnection failed: %v", err)
}
}()
return true
}
// handleEventReconnectAll handles a general reconnect request (e.g. from !reconnect).
// It reconnects all non-IRC bridges (Kosmi, Jackbox).
// Returns true if the event was consumed.
func (r *Router) handleEventReconnectAll(msg *config.Message) bool {
if msg.Event != config.EventReconnectAll {
return false
}
r.sendConfirmation(msg, "Reconnecting all services...")
// Reconnect Kosmi bridges
for _, gw := range r.Gateways {
for _, br := range gw.Bridges {
if br.Protocol == "kosmi" {
r.logger.Infof("Reconnecting Kosmi bridge %s (requested via !reconnect)", br.Account)
go gw.reconnectBridge(br)
}
}
}
// Reconnect Jackbox
if r.JackboxManager != nil && r.JackboxManager.IsEnabled() {
go func() {
r.logger.Info("Reconnecting Jackbox WebSocket (requested via !reconnect)")
if err := r.JackboxManager.Reconnect(); err != nil {
r.logger.Errorf("Jackbox reconnection failed: %v", err)
}
}()
}
return true
}
// sendConfirmation sends a short confirmation message back to the IRC channel
// that originated a command event.
func (r *Router) sendConfirmation(msg *config.Message, text string) {
if msg.Account == "" || msg.Channel == "" {
return
}
for _, gw := range r.Gateways {
if ircBr, ok := gw.Bridges[msg.Account]; ok {
ircBr.Send(config.Message{
Text: text,
Channel: msg.Channel,
Username: "system",
Account: msg.Account,
})
return
}
}
}
// handleEventVotesQuery handles a !votes command by fetching vote data for the // handleEventVotesQuery handles a !votes command by fetching vote data for the
// currently playing game and broadcasting the result to all bridges. // currently playing game and broadcasting the result to all bridges.
// Returns true if the event was consumed. // Returns true if the event was consumed.

View File

@@ -158,6 +158,12 @@ func (r *Router) handleReceive() {
if r.handleEventReconnectKosmi(&msg) { if r.handleEventReconnectKosmi(&msg) {
continue continue
} }
if r.handleEventReconnectJackbox(&msg) {
continue
}
if r.handleEventReconnectAll(&msg) {
continue
}
if r.handleEventVotesQuery(&msg) { if r.handleEventVotesQuery(&msg) {
continue continue
} }