feat: add router with dedup, echo suppression, and state handling

Made-with: Cursor
This commit is contained in:
cottongin
2026-03-10 21:59:54 -04:00
parent cc806c55e7
commit a7b80264d0
2 changed files with 237 additions and 0 deletions

View File

@@ -5,6 +5,7 @@ mod health;
mod html;
mod irc_task;
mod owncast_api;
mod router;
mod webhook;
mod websocket;

236
src/router.rs Normal file
View File

@@ -0,0 +1,236 @@
use std::collections::{HashSet, VecDeque};
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{info, warn};
use crate::config::BridgeSettings;
use crate::events::{BridgeEvent, ControlCommand, OwncastState, Source};
use crate::owncast_api::OwncastApiClient;
pub struct DedupTracker {
seen: VecDeque<String>,
set: HashSet<String>,
capacity: usize,
}
impl DedupTracker {
pub fn new(capacity: usize) -> Self {
Self {
seen: VecDeque::with_capacity(capacity),
set: HashSet::with_capacity(capacity),
capacity,
}
}
pub fn is_duplicate(&mut self, id: &str) -> bool {
if self.set.contains(id) {
return true;
}
if self.seen.len() >= self.capacity {
if let Some(old) = self.seen.pop_front() {
self.set.remove(&old);
}
}
self.set.insert(id.to_string());
self.seen.push_back(id.to_string());
false
}
}
pub struct EchoSuppressor {
recent: VecDeque<String>,
capacity: usize,
}
impl EchoSuppressor {
pub fn new(capacity: usize) -> Self {
Self {
recent: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn record_sent(&mut self, body: &str) {
if self.recent.len() >= self.capacity {
self.recent.pop_front();
}
self.recent.push_back(body.to_string());
}
pub fn is_echo(&mut self, body: &str) -> bool {
if let Some(pos) = self.recent.iter().position(|s| s == body) {
self.recent.remove(pos);
true
} else {
false
}
}
}
pub async fn run_router(
settings: BridgeSettings,
owncast_url: String,
api_client: OwncastApiClient,
mut event_rx: mpsc::Receiver<BridgeEvent>,
irc_outbound_tx: mpsc::Sender<String>,
mut state_rx: mpsc::Receiver<OwncastState>,
mut control_rx: mpsc::Receiver<ControlCommand>,
shutdown_tx: tokio::sync::watch::Sender<bool>,
start_time: Instant,
) {
let mut dedup = DedupTracker::new(500);
let mut echo_suppressor = EchoSuppressor::new(50);
let mut owncast_state = OwncastState::Unavailable;
loop {
tokio::select! {
Some(event) = event_rx.recv() => {
handle_event(
event,
&settings,
&owncast_url,
&api_client,
&irc_outbound_tx,
&mut dedup,
&mut echo_suppressor,
&owncast_state,
).await;
}
Some(new_state) = state_rx.recv() => {
if new_state != owncast_state {
handle_state_change(&new_state, &owncast_state, &irc_outbound_tx).await;
owncast_state = new_state;
}
}
Some(cmd) = control_rx.recv() => {
match cmd {
ControlCommand::Quit => {
info!("Quit command received, shutting down");
let _ = shutdown_tx.send(true);
return;
}
ControlCommand::Status { reply } => {
let status = crate::events::BridgeStatus {
irc_connected: true,
owncast_state: format!("{:?}", owncast_state),
webhook_listening: true,
websocket_connected: false,
uptime_secs: start_time.elapsed().as_secs(),
};
let _ = reply.send(status);
}
other => {
warn!(command = ?other, "Control command not yet implemented in router");
}
}
}
else => break,
}
}
}
async fn handle_event(
event: BridgeEvent,
settings: &BridgeSettings,
owncast_url: &str,
api_client: &OwncastApiClient,
irc_tx: &mpsc::Sender<String>,
dedup: &mut DedupTracker,
echo: &mut EchoSuppressor,
owncast_state: &OwncastState,
) {
match event {
BridgeEvent::ChatMessage { source, username, body, id } => {
if let Some(ref msg_id) = id {
if dedup.is_duplicate(msg_id) {
return;
}
}
match source {
Source::Irc => {
if *owncast_state == OwncastState::Unavailable {
return;
}
let formatted = format!("{} <{}> {}", settings.irc_prefix, username, body);
echo.record_sent(&formatted);
if let Err(e) = api_client.send_chat_message(&formatted).await {
warn!(error = %e, "Failed to send to Owncast");
}
}
Source::Owncast => {
let formatted = format!("{} <{}> {}", settings.owncast_prefix, username, body);
if echo.is_echo(&body) {
return;
}
if irc_tx.send(formatted).await.is_err() {
warn!("IRC outbound channel closed");
}
}
}
}
BridgeEvent::StreamStarted { title } => {
let msg = if title.is_empty() {
format!("Stream started — {}", owncast_url)
} else {
format!("Stream started: {}{}", title, owncast_url)
};
let _ = irc_tx.send(msg).await;
}
BridgeEvent::StreamStopped => {
let _ = irc_tx.send("Stream ended.".to_string()).await;
}
}
}
async fn handle_state_change(
new: &OwncastState,
_old: &OwncastState,
irc_tx: &mpsc::Sender<String>,
) {
match new {
OwncastState::Online => {
let _ = irc_tx.send("Owncast chat is now available.".to_string()).await;
}
OwncastState::Unavailable => {
let _ = irc_tx.send("Owncast chat is currently unavailable.".to_string()).await;
}
OwncastState::OfflineChatOpen => {}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dedup_tracker_new_id() {
let mut tracker = DedupTracker::new(100);
assert!(!tracker.is_duplicate("msg-1"));
}
#[test]
fn test_dedup_tracker_duplicate() {
let mut tracker = DedupTracker::new(100);
assert!(!tracker.is_duplicate("msg-1"));
assert!(tracker.is_duplicate("msg-1"));
}
#[test]
fn test_dedup_tracker_evicts_old() {
let mut tracker = DedupTracker::new(2);
tracker.is_duplicate("a");
tracker.is_duplicate("b");
tracker.is_duplicate("c");
assert!(!tracker.is_duplicate("a"));
}
#[test]
fn test_echo_suppressor() {
let mut suppressor = EchoSuppressor::new(10);
suppressor.record_sent("hello from IRC");
assert!(suppressor.is_echo("hello from IRC"));
assert!(!suppressor.is_echo("different message"));
}
}