diff --git a/bridge/config/config.go b/bridge/config/config.go index 5e23ff6..69d13cb 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -29,6 +29,8 @@ const ( EventGetChannelMembers = "get_channel_members" EventNoticeIRC = "notice_irc" EventReconnectKosmi = "reconnect_kosmi" + EventReconnectJackbox = "reconnect_jackbox" + EventReconnectAll = "reconnect_all" EventVotesQuery = "votes_query" ) diff --git a/bridge/irc/handlers.go b/bridge/irc/handlers.go index 96ade2e..5ff17ee 100644 --- a/bridge/irc/handlers.go +++ b/bridge/irc/handlers.go @@ -268,8 +268,10 @@ func (b *Birc) handlePrivMsg(client *girc.Client, event girc.Event) { } } - // Handle !kreconnect command: trigger Kosmi bridge reconnection - if strings.TrimSpace(rmsg.Text) == "!kreconnect" { + // Handle reconnect commands + trimmedText := strings.TrimSpace(rmsg.Text) + switch trimmedText { + case "!kreconnect": b.Log.Infof("!kreconnect command from %s on %s", event.Source.Name, rmsg.Channel) b.Remote <- config.Message{ Username: "system", @@ -279,6 +281,26 @@ func (b *Birc) handlePrivMsg(client *girc.Client, event girc.Event) { Event: config.EventReconnectKosmi, } return + case "!jreconnect": + b.Log.Infof("!jreconnect command from %s on %s", event.Source.Name, rmsg.Channel) + b.Remote <- config.Message{ + Username: "system", + Text: "jreconnect", + Channel: rmsg.Channel, + Account: b.Account, + Event: config.EventReconnectJackbox, + } + return + case "!reconnect": + b.Log.Infof("!reconnect command from %s on %s", event.Source.Name, rmsg.Channel) + b.Remote <- config.Message{ + Username: "system", + Text: "reconnect", + Channel: rmsg.Channel, + Account: b.Account, + Event: config.EventReconnectAll, + } + return } // Handle !votes command: query current game vote tally diff --git a/bridge/jackbox/manager.go b/bridge/jackbox/manager.go index 666d393..ecc3653 100644 --- a/bridge/jackbox/manager.go +++ b/bridge/jackbox/manager.go @@ -242,6 +242,37 @@ func (m *Manager) monitorActiveSessions() { } } +// Reconnect tears down the existing WebSocket client and establishes a new +// connection (re-authenticating for a fresh JWT). Safe to call even when +// already disconnected. +func (m *Manager) Reconnect() error { + if !m.enabled || !m.useWebSocket { + return fmt.Errorf("Jackbox WebSocket is not enabled") + } + + m.log.Info("Forcing Jackbox WebSocket reconnection...") + + // Tear down existing client + if m.wsClient != nil { + if err := m.wsClient.Close(); err != nil { + m.log.Errorf("Error closing existing WebSocket client: %v", err) + } + m.wsClient = nil + } + + // Re-authenticate for a fresh JWT + if err := m.client.Authenticate(); err != nil { + return fmt.Errorf("re-authentication failed: %w", err) + } + + // Rebuild the WebSocket client using the original callback + if m.messageCallback == nil { + return fmt.Errorf("no message callback registered") + } + + return m.startWebSocketClient(m.messageCallback) +} + // GetClient returns the Jackbox API client (may be nil if disabled) func (m *Manager) GetClient() *Client { return m.client diff --git a/bridge/jackbox/websocket_client.go b/bridge/jackbox/websocket_client.go index 84ffe62..980eacb 100644 --- a/bridge/jackbox/websocket_client.go +++ b/bridge/jackbox/websocket_client.go @@ -10,6 +10,11 @@ import ( "github.com/sirupsen/logrus" ) +const ( + jackboxPingInterval = 30 * time.Second + jackboxReadTimeout = 90 * time.Second +) + // WebSocketClient handles WebSocket connection to Jackbox API type WebSocketClient struct { apiURL string @@ -22,6 +27,7 @@ type WebSocketClient struct { reconnectDelay time.Duration maxReconnect time.Duration stopChan chan struct{} + listenDone chan struct{} // closed when the current listen() goroutine exits connected bool authenticated bool subscribedSession int @@ -109,8 +115,11 @@ func (c *WebSocketClient) Connect() error { c.conn = conn c.connected = true + c.listenDone = make(chan struct{}) c.log.Info("WebSocket connected") + conn.SetReadDeadline(time.Now().Add(jackboxReadTimeout)) + // Start message listener go c.listen() @@ -181,9 +190,9 @@ func (c *WebSocketClient) Unsubscribe(sessionID int) error { // listen handles incoming WebSocket messages func (c *WebSocketClient) listen() { defer c.handleDisconnect() + defer close(c.listenDone) - // Start heartbeat - go c.startHeartbeat() + go c.startHeartbeat(c.listenDone) for { select { @@ -196,6 +205,9 @@ func (c *WebSocketClient) listen() { return } + // Reset read deadline on every successful read + c.conn.SetReadDeadline(time.Now().Add(jackboxReadTimeout)) + c.handleMessage(message) } } @@ -438,15 +450,18 @@ func (c *WebSocketClient) AnnounceSessionEnd() { } } -// startHeartbeat sends ping messages periodically -func (c *WebSocketClient) startHeartbeat() { - ticker := time.NewTicker(30 * time.Second) +// startHeartbeat sends ping messages periodically. It exits when listenDone +// is closed (current connection ended) or stopChan is closed (full shutdown). +func (c *WebSocketClient) startHeartbeat(listenDone <-chan struct{}) { + ticker := time.NewTicker(jackboxPingInterval) defer ticker.Stop() for { select { case <-c.stopChan: return + case <-listenDone: + return case <-ticker.C: c.mu.Lock() if c.connected && c.conn != nil { @@ -474,7 +489,9 @@ func (c *WebSocketClient) sendMessage(msg WSMessage) error { return c.conn.WriteMessage(websocket.TextMessage, data) } -// handleDisconnect handles connection loss and attempts reconnection +// handleDisconnect handles connection loss and attempts reconnection with +// exponential backoff. Before each attempt it re-authenticates via the HTTP +// API to obtain a fresh JWT (the old token may be invalid after a server restart). func (c *WebSocketClient) handleDisconnect() { c.mu.Lock() c.connected = false @@ -487,7 +504,6 @@ func (c *WebSocketClient) handleDisconnect() { c.log.Warn("WebSocket disconnected, attempting to reconnect...") - // Exponential backoff reconnection delay := c.reconnectDelay for { select { @@ -496,21 +512,27 @@ func (c *WebSocketClient) handleDisconnect() { case <-time.After(delay): c.log.Infof("Reconnecting... (delay: %v)", delay) + // Re-authenticate to get a fresh JWT token before reconnecting. + if c.apiClient != nil { + if err := c.apiClient.Authenticate(); err != nil { + c.log.Errorf("Re-authentication failed: %v (will retry)", err) + delay = c.bumpDelay(delay) + continue + } + c.mu.Lock() + c.token = c.apiClient.GetToken() + c.mu.Unlock() + c.log.Info("Re-authenticated with fresh JWT token") + } + if err := c.Connect(); err != nil { c.log.Errorf("Reconnection failed: %v", err) - - // Increase delay with exponential backoff - delay *= 2 - if delay > c.maxReconnect { - delay = c.maxReconnect - } + delay = c.bumpDelay(delay) continue } - // Reconnected successfully c.log.Info("Reconnected successfully") - // Re-subscribe if we were subscribed before if c.subscribedSession > 0 { if err := c.Subscribe(c.subscribedSession); err != nil { c.log.Errorf("Failed to re-subscribe: %v", err) @@ -522,6 +544,14 @@ func (c *WebSocketClient) handleDisconnect() { } } +func (c *WebSocketClient) bumpDelay(d time.Duration) time.Duration { + d *= 2 + if d > c.maxReconnect { + d = c.maxReconnect + } + return d +} + // Close closes the WebSocket connection func (c *WebSocketClient) Close() error { c.mu.Lock() diff --git a/gateway/handlers.go b/gateway/handlers.go index 8216f77..5cb5c5b 100644 --- a/gateway/handlers.go +++ b/gateway/handlers.go @@ -57,26 +57,12 @@ func (r *Router) handleEventReconnectKosmi(msg *config.Message) bool { return false } - originChannel := msg.Channel - originAccount := msg.Account + r.sendConfirmation(msg, "Reconnecting Kosmi...") for _, gw := range r.Gateways { for _, br := range gw.Bridges { if br.Protocol == "kosmi" { r.logger.Infof("Reconnecting Kosmi bridge %s (requested via !kreconnect)", br.Account) - - // Send confirmation to the IRC channel that requested it - if originAccount != "" && originChannel != "" { - if ircBr, ok := gw.Bridges[originAccount]; ok { - ircBr.Send(config.Message{ - Text: "Reconnecting Kosmi...", - Channel: originChannel, - Username: "system", - Account: originAccount, - }) - } - } - go gw.reconnectBridge(br) return true } @@ -87,6 +73,79 @@ func (r *Router) handleEventReconnectKosmi(msg *config.Message) bool { return true } +// handleEventReconnectJackbox handles a manual Jackbox reconnect request (e.g. from !jreconnect). +// Returns true if the event was consumed. +func (r *Router) handleEventReconnectJackbox(msg *config.Message) bool { + if msg.Event != config.EventReconnectJackbox { + return false + } + + r.sendConfirmation(msg, "Reconnecting Jackbox...") + go func() { + if r.JackboxManager == nil || !r.JackboxManager.IsEnabled() { + r.logger.Warn("!jreconnect: Jackbox integration is not enabled") + return + } + r.logger.Info("Reconnecting Jackbox WebSocket (requested via !jreconnect)") + if err := r.JackboxManager.Reconnect(); err != nil { + r.logger.Errorf("Jackbox reconnection failed: %v", err) + } + }() + return true +} + +// handleEventReconnectAll handles a general reconnect request (e.g. from !reconnect). +// It reconnects all non-IRC bridges (Kosmi, Jackbox). +// Returns true if the event was consumed. +func (r *Router) handleEventReconnectAll(msg *config.Message) bool { + if msg.Event != config.EventReconnectAll { + return false + } + + r.sendConfirmation(msg, "Reconnecting all services...") + + // Reconnect Kosmi bridges + for _, gw := range r.Gateways { + for _, br := range gw.Bridges { + if br.Protocol == "kosmi" { + r.logger.Infof("Reconnecting Kosmi bridge %s (requested via !reconnect)", br.Account) + go gw.reconnectBridge(br) + } + } + } + + // Reconnect Jackbox + if r.JackboxManager != nil && r.JackboxManager.IsEnabled() { + go func() { + r.logger.Info("Reconnecting Jackbox WebSocket (requested via !reconnect)") + if err := r.JackboxManager.Reconnect(); err != nil { + r.logger.Errorf("Jackbox reconnection failed: %v", err) + } + }() + } + + return true +} + +// sendConfirmation sends a short confirmation message back to the IRC channel +// that originated a command event. +func (r *Router) sendConfirmation(msg *config.Message, text string) { + if msg.Account == "" || msg.Channel == "" { + return + } + for _, gw := range r.Gateways { + if ircBr, ok := gw.Bridges[msg.Account]; ok { + ircBr.Send(config.Message{ + Text: text, + Channel: msg.Channel, + Username: "system", + Account: msg.Account, + }) + return + } + } +} + // handleEventVotesQuery handles a !votes command by fetching vote data for the // currently playing game and broadcasting the result to all bridges. // Returns true if the event was consumed. diff --git a/gateway/router.go b/gateway/router.go index 0ea514e..14a0454 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -158,6 +158,12 @@ func (r *Router) handleReceive() { if r.handleEventReconnectKosmi(&msg) { continue } + if r.handleEventReconnectJackbox(&msg) { + continue + } + if r.handleEventReconnectAll(&msg) { + continue + } if r.handleEventVotesQuery(&msg) { continue }