265 lines
7.4 KiB
Go
265 lines
7.4 KiB
Go
package gateway
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/42wim/matterbridge/bridge"
|
|
"github.com/42wim/matterbridge/bridge/config"
|
|
"github.com/42wim/matterbridge/bridge/jackbox"
|
|
"github.com/42wim/matterbridge/gateway/samechannel"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type Router struct {
|
|
config.Config
|
|
sync.RWMutex
|
|
|
|
BridgeMap map[string]bridge.Factory
|
|
Gateways map[string]*Gateway
|
|
Message chan config.Message
|
|
MattermostPlugin chan config.Message
|
|
JackboxManager *jackbox.Manager
|
|
|
|
logger *logrus.Entry
|
|
rootLogger *logrus.Logger
|
|
}
|
|
|
|
// NewRouter initializes a new Matterbridge router for the specified configuration and
|
|
// sets up all required gateways.
|
|
func NewRouter(rootLogger *logrus.Logger, cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) {
|
|
logger := rootLogger.WithFields(logrus.Fields{"prefix": "router"})
|
|
|
|
r := &Router{
|
|
Config: cfg,
|
|
BridgeMap: bridgeMap,
|
|
Message: make(chan config.Message),
|
|
MattermostPlugin: make(chan config.Message),
|
|
Gateways: make(map[string]*Gateway),
|
|
logger: logger,
|
|
rootLogger: rootLogger,
|
|
}
|
|
|
|
// Initialize Jackbox manager
|
|
jackboxLogger := rootLogger.WithFields(logrus.Fields{"prefix": "jackbox"})
|
|
r.JackboxManager = jackbox.NewManager(cfg, jackboxLogger)
|
|
if err := r.JackboxManager.Initialize(); err != nil {
|
|
logger.Errorf("Failed to initialize Jackbox integration: %v", err)
|
|
// Don't fail startup if Jackbox integration fails
|
|
}
|
|
sgw := samechannel.New(cfg)
|
|
gwconfigs := append(sgw.GetConfig(), cfg.BridgeValues().Gateway...)
|
|
|
|
for idx := range gwconfigs {
|
|
entry := &gwconfigs[idx]
|
|
if !entry.Enable {
|
|
continue
|
|
}
|
|
if entry.Name == "" {
|
|
return nil, fmt.Errorf("%s", "Gateway without name found")
|
|
}
|
|
if _, ok := r.Gateways[entry.Name]; ok {
|
|
return nil, fmt.Errorf("Gateway with name %s already exists", entry.Name)
|
|
}
|
|
r.Gateways[entry.Name] = New(rootLogger, entry, r)
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// Start will connect all gateways belonging to this router and subsequently route messages
|
|
// between them.
|
|
func (r *Router) Start() error {
|
|
m := make(map[string]*bridge.Bridge)
|
|
if len(r.Gateways) == 0 {
|
|
return fmt.Errorf("no [[gateway]] configured. See https://github.com/42wim/matterbridge/wiki/How-to-create-your-config for more info")
|
|
}
|
|
for _, gw := range r.Gateways {
|
|
r.logger.Infof("Parsing gateway %s", gw.Name)
|
|
if len(gw.Bridges) == 0 {
|
|
return fmt.Errorf("no bridges configured for gateway %s. See https://github.com/42wim/matterbridge/wiki/How-to-create-your-config for more info", gw.Name)
|
|
}
|
|
for _, br := range gw.Bridges {
|
|
m[br.Account] = br
|
|
}
|
|
}
|
|
// Inject Jackbox client into bridges if enabled
|
|
if r.JackboxManager.IsEnabled() {
|
|
r.injectJackboxClient(m)
|
|
}
|
|
|
|
for _, br := range m {
|
|
r.logger.Infof("Starting bridge: %s ", br.Account)
|
|
err := br.Connect()
|
|
if err != nil {
|
|
e := fmt.Errorf("Bridge %s failed to start: %v", br.Account, err)
|
|
if r.disableBridge(br, e) {
|
|
continue
|
|
}
|
|
return e
|
|
}
|
|
err = br.JoinChannels()
|
|
if err != nil {
|
|
e := fmt.Errorf("Bridge %s failed to join channel: %v", br.Account, err)
|
|
if r.disableBridge(br, e) {
|
|
continue
|
|
}
|
|
return e
|
|
}
|
|
}
|
|
// remove unused bridges
|
|
for _, gw := range r.Gateways {
|
|
for i, br := range gw.Bridges {
|
|
if br.Bridger == nil {
|
|
r.logger.Errorf("removing failed bridge %s", i)
|
|
delete(gw.Bridges, i)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start webhook server if Jackbox is enabled
|
|
if r.JackboxManager.IsEnabled() {
|
|
if err := r.JackboxManager.StartWebhookServer(r.broadcastJackboxMessage); err != nil {
|
|
r.logger.Errorf("Failed to start Jackbox webhook server: %v", err)
|
|
}
|
|
}
|
|
|
|
go r.handleReceive()
|
|
//go r.updateChannelMembers()
|
|
return nil
|
|
}
|
|
|
|
// disableBridge returns true and empties a bridge if we have IgnoreFailureOnStart configured
|
|
// otherwise returns false
|
|
func (r *Router) disableBridge(br *bridge.Bridge, err error) bool {
|
|
if r.BridgeValues().General.IgnoreFailureOnStart {
|
|
r.logger.Error(err)
|
|
// setting this bridge empty
|
|
*br = bridge.Bridge{
|
|
Log: br.Log,
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *Router) getBridge(account string) *bridge.Bridge {
|
|
for _, gw := range r.Gateways {
|
|
if br, ok := gw.Bridges[account]; ok {
|
|
return br
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Router) handleReceive() {
|
|
for msg := range r.Message {
|
|
msg := msg // scopelint
|
|
r.handleEventGetChannelMembers(&msg)
|
|
r.handleEventFailure(&msg)
|
|
r.handleEventRejoinChannels(&msg)
|
|
|
|
// Set message protocol based on the account it came from
|
|
msg.Protocol = r.getBridge(msg.Account).Protocol
|
|
|
|
filesHandled := false
|
|
for _, gw := range r.Gateways {
|
|
// record all the message ID's of the different bridges
|
|
var msgIDs []*BrMsgID
|
|
if gw.ignoreMessage(&msg) {
|
|
continue
|
|
}
|
|
msg.Timestamp = time.Now()
|
|
gw.modifyMessage(&msg)
|
|
if !filesHandled {
|
|
gw.handleFiles(&msg)
|
|
filesHandled = true
|
|
}
|
|
for _, br := range gw.Bridges {
|
|
msgIDs = append(msgIDs, gw.handleMessage(&msg, br)...)
|
|
}
|
|
|
|
if msg.ID != "" {
|
|
_, exists := gw.Messages.Get(msg.Protocol + " " + msg.ID)
|
|
|
|
// Only add the message ID if it doesn't already exist
|
|
//
|
|
// For some bridges we always add/update the message ID.
|
|
// This is necessary as msgIDs will change if a bridge returns
|
|
// a different ID in response to edits.
|
|
if !exists {
|
|
gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateChannelMembers sends every minute an GetChannelMembers event to all bridges.
|
|
func (r *Router) updateChannelMembers() {
|
|
// TODO sleep a minute because slack can take a while
|
|
// fix this by having actually connectionDone events send to the router
|
|
time.Sleep(time.Minute)
|
|
for {
|
|
for _, gw := range r.Gateways {
|
|
for _, br := range gw.Bridges {
|
|
// only for slack now
|
|
if br.Protocol != "slack" {
|
|
continue
|
|
}
|
|
r.logger.Debugf("sending %s to %s", config.EventGetChannelMembers, br.Account)
|
|
if _, err := br.Send(config.Message{Event: config.EventGetChannelMembers}); err != nil {
|
|
r.logger.Errorf("updateChannelMembers: %s", err)
|
|
}
|
|
}
|
|
}
|
|
time.Sleep(time.Minute)
|
|
}
|
|
}
|
|
|
|
// injectJackboxClient injects the Jackbox client into all bridges
|
|
func (r *Router) injectJackboxClient(bridges map[string]*bridge.Bridge) {
|
|
client := r.JackboxManager.GetClient()
|
|
if client == nil {
|
|
return
|
|
}
|
|
|
|
for _, br := range bridges {
|
|
// Type assert to inject the client into supported bridge types
|
|
switch bridger := br.Bridger.(type) {
|
|
case interface{ SetJackboxClient(*jackbox.Client) }:
|
|
bridger.SetJackboxClient(client)
|
|
r.logger.Debugf("Injected Jackbox client into bridge %s", br.Account)
|
|
}
|
|
}
|
|
}
|
|
|
|
// broadcastJackboxMessage broadcasts a message from Jackbox to all connected bridges
|
|
func (r *Router) broadcastJackboxMessage(message string) {
|
|
// Check if Jackbox announcements are muted
|
|
if r.JackboxManager != nil && r.JackboxManager.IsMuted() {
|
|
r.logger.Debugf("Jackbox message suppressed (muted): %s", message)
|
|
return
|
|
}
|
|
|
|
r.logger.Infof("Broadcasting Jackbox message: %s", message)
|
|
|
|
// Send message to all gateways
|
|
for _, gw := range r.Gateways {
|
|
for _, br := range gw.Bridges {
|
|
// Send to each bridge
|
|
msg := config.Message{
|
|
Text: " " + message, // Add space before message for proper formatting
|
|
Username: "Jackbox",
|
|
Account: "jackbox",
|
|
Event: config.EventUserAction,
|
|
}
|
|
|
|
// Send directly to the bridge
|
|
if _, err := br.Send(msg); err != nil {
|
|
r.logger.Errorf("Failed to send Jackbox message to %s: %v", br.Account, err)
|
|
}
|
|
}
|
|
}
|
|
}
|