diff --git a/src/main.rs b/src/main.rs index 0e86b11..260c885 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod health; mod html; mod irc_task; mod owncast_api; +mod router; mod webhook; mod websocket; diff --git a/src/router.rs b/src/router.rs new file mode 100644 index 0000000..862e5aa --- /dev/null +++ b/src/router.rs @@ -0,0 +1,236 @@ +use std::collections::{HashSet, VecDeque}; +use std::time::Instant; + +use tokio::sync::mpsc; +use tracing::{info, warn}; + +use crate::config::BridgeSettings; +use crate::events::{BridgeEvent, ControlCommand, OwncastState, Source}; +use crate::owncast_api::OwncastApiClient; + +pub struct DedupTracker { + seen: VecDeque, + set: HashSet, + capacity: usize, +} + +impl DedupTracker { + pub fn new(capacity: usize) -> Self { + Self { + seen: VecDeque::with_capacity(capacity), + set: HashSet::with_capacity(capacity), + capacity, + } + } + + pub fn is_duplicate(&mut self, id: &str) -> bool { + if self.set.contains(id) { + return true; + } + if self.seen.len() >= self.capacity { + if let Some(old) = self.seen.pop_front() { + self.set.remove(&old); + } + } + self.set.insert(id.to_string()); + self.seen.push_back(id.to_string()); + false + } +} + +pub struct EchoSuppressor { + recent: VecDeque, + capacity: usize, +} + +impl EchoSuppressor { + pub fn new(capacity: usize) -> Self { + Self { + recent: VecDeque::with_capacity(capacity), + capacity, + } + } + + pub fn record_sent(&mut self, body: &str) { + if self.recent.len() >= self.capacity { + self.recent.pop_front(); + } + self.recent.push_back(body.to_string()); + } + + pub fn is_echo(&mut self, body: &str) -> bool { + if let Some(pos) = self.recent.iter().position(|s| s == body) { + self.recent.remove(pos); + true + } else { + false + } + } +} + +pub async fn run_router( + settings: BridgeSettings, + owncast_url: String, + api_client: OwncastApiClient, + mut event_rx: mpsc::Receiver, + irc_outbound_tx: mpsc::Sender, + mut state_rx: mpsc::Receiver, + mut control_rx: mpsc::Receiver, + shutdown_tx: tokio::sync::watch::Sender, + start_time: Instant, +) { + let mut dedup = DedupTracker::new(500); + let mut echo_suppressor = EchoSuppressor::new(50); + let mut owncast_state = OwncastState::Unavailable; + + loop { + tokio::select! { + Some(event) = event_rx.recv() => { + handle_event( + event, + &settings, + &owncast_url, + &api_client, + &irc_outbound_tx, + &mut dedup, + &mut echo_suppressor, + &owncast_state, + ).await; + } + Some(new_state) = state_rx.recv() => { + if new_state != owncast_state { + handle_state_change(&new_state, &owncast_state, &irc_outbound_tx).await; + owncast_state = new_state; + } + } + Some(cmd) = control_rx.recv() => { + match cmd { + ControlCommand::Quit => { + info!("Quit command received, shutting down"); + let _ = shutdown_tx.send(true); + return; + } + ControlCommand::Status { reply } => { + let status = crate::events::BridgeStatus { + irc_connected: true, + owncast_state: format!("{:?}", owncast_state), + webhook_listening: true, + websocket_connected: false, + uptime_secs: start_time.elapsed().as_secs(), + }; + let _ = reply.send(status); + } + other => { + warn!(command = ?other, "Control command not yet implemented in router"); + } + } + } + else => break, + } + } +} + +async fn handle_event( + event: BridgeEvent, + settings: &BridgeSettings, + owncast_url: &str, + api_client: &OwncastApiClient, + irc_tx: &mpsc::Sender, + dedup: &mut DedupTracker, + echo: &mut EchoSuppressor, + owncast_state: &OwncastState, +) { + match event { + BridgeEvent::ChatMessage { source, username, body, id } => { + if let Some(ref msg_id) = id { + if dedup.is_duplicate(msg_id) { + return; + } + } + + match source { + Source::Irc => { + if *owncast_state == OwncastState::Unavailable { + return; + } + let formatted = format!("{} <{}> {}", settings.irc_prefix, username, body); + echo.record_sent(&formatted); + if let Err(e) = api_client.send_chat_message(&formatted).await { + warn!(error = %e, "Failed to send to Owncast"); + } + } + Source::Owncast => { + let formatted = format!("{} <{}> {}", settings.owncast_prefix, username, body); + if echo.is_echo(&body) { + return; + } + if irc_tx.send(formatted).await.is_err() { + warn!("IRC outbound channel closed"); + } + } + } + } + BridgeEvent::StreamStarted { title } => { + let msg = if title.is_empty() { + format!("Stream started — {}", owncast_url) + } else { + format!("Stream started: {} — {}", title, owncast_url) + }; + let _ = irc_tx.send(msg).await; + } + BridgeEvent::StreamStopped => { + let _ = irc_tx.send("Stream ended.".to_string()).await; + } + } +} + +async fn handle_state_change( + new: &OwncastState, + _old: &OwncastState, + irc_tx: &mpsc::Sender, +) { + match new { + OwncastState::Online => { + let _ = irc_tx.send("Owncast chat is now available.".to_string()).await; + } + OwncastState::Unavailable => { + let _ = irc_tx.send("Owncast chat is currently unavailable.".to_string()).await; + } + OwncastState::OfflineChatOpen => {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dedup_tracker_new_id() { + let mut tracker = DedupTracker::new(100); + assert!(!tracker.is_duplicate("msg-1")); + } + + #[test] + fn test_dedup_tracker_duplicate() { + let mut tracker = DedupTracker::new(100); + assert!(!tracker.is_duplicate("msg-1")); + assert!(tracker.is_duplicate("msg-1")); + } + + #[test] + fn test_dedup_tracker_evicts_old() { + let mut tracker = DedupTracker::new(2); + tracker.is_duplicate("a"); + tracker.is_duplicate("b"); + tracker.is_duplicate("c"); + assert!(!tracker.is_duplicate("a")); + } + + #[test] + fn test_echo_suppressor() { + let mut suppressor = EchoSuppressor::new(10); + suppressor.record_sent("hello from IRC"); + assert!(suppressor.is_echo("hello from IRC")); + assert!(!suppressor.is_echo("different message")); + } +}