feat: add IRC task with auto-reconnect and backoff
Made-with: Cursor
This commit is contained in:
99
src/irc_task.rs
Normal file
99
src/irc_task.rs
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use irc::client::prelude::*;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
|
use crate::config::IrcConfig;
|
||||||
|
use crate::events::{BridgeEvent, Source};
|
||||||
|
|
||||||
|
pub async fn run_irc_task(
|
||||||
|
config: IrcConfig,
|
||||||
|
event_tx: mpsc::Sender<BridgeEvent>,
|
||||||
|
mut outbound_rx: mpsc::Receiver<String>,
|
||||||
|
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
||||||
|
) {
|
||||||
|
let mut backoff = Duration::from_secs(1);
|
||||||
|
let max_backoff = Duration::from_secs(60);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
info!(server = %config.server, channel = %config.channel, "Connecting to IRC");
|
||||||
|
|
||||||
|
match connect_and_run(&config, &event_tx, &mut outbound_rx, &mut shutdown).await {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("IRC task exiting cleanly");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(error = %e, "IRC connection error");
|
||||||
|
info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff");
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(backoff) => {},
|
||||||
|
_ = shutdown.changed() => return,
|
||||||
|
}
|
||||||
|
|
||||||
|
backoff = (backoff * 2).min(max_backoff);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_and_run(
|
||||||
|
config: &IrcConfig,
|
||||||
|
event_tx: &mpsc::Sender<BridgeEvent>,
|
||||||
|
outbound_rx: &mut mpsc::Receiver<String>,
|
||||||
|
shutdown: &mut tokio::sync::watch::Receiver<bool>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let irc_config = Config {
|
||||||
|
nickname: Some(config.nick.clone()),
|
||||||
|
server: Some(config.server.clone()),
|
||||||
|
port: Some(config.port),
|
||||||
|
use_tls: Some(config.tls),
|
||||||
|
channels: vec![config.channel.clone()],
|
||||||
|
..Config::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut client = Client::from_config(irc_config).await?;
|
||||||
|
client.identify()?;
|
||||||
|
|
||||||
|
let mut stream = client.stream()?;
|
||||||
|
let sender = client.sender();
|
||||||
|
|
||||||
|
info!("IRC connected, waiting for messages");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
msg = stream.next() => {
|
||||||
|
match msg {
|
||||||
|
Some(Ok(message)) => {
|
||||||
|
if let Command::PRIVMSG(ref target, ref text) = message.command {
|
||||||
|
if target == &config.channel {
|
||||||
|
let nick = message.source_nickname().unwrap_or("unknown").to_string();
|
||||||
|
let event = BridgeEvent::ChatMessage {
|
||||||
|
source: Source::Irc,
|
||||||
|
username: nick,
|
||||||
|
body: text.clone(),
|
||||||
|
id: None,
|
||||||
|
};
|
||||||
|
if event_tx.send(event).await.is_err() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Err(e)) => return Err(e.into()),
|
||||||
|
None => return Err(anyhow::anyhow!("IRC stream ended")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(text) = outbound_rx.recv() => {
|
||||||
|
sender.send_privmsg(&config.channel, &text)?;
|
||||||
|
}
|
||||||
|
_ = shutdown.changed() => {
|
||||||
|
let _ = sender.send_quit("Bridge shutting down");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ mod config;
|
|||||||
mod events;
|
mod events;
|
||||||
mod health;
|
mod health;
|
||||||
mod html;
|
mod html;
|
||||||
|
mod irc_task;
|
||||||
mod owncast_api;
|
mod owncast_api;
|
||||||
mod webhook;
|
mod webhook;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user