# Owncast–IRC Bridge Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Build a bidirectional Rust chat bridge between Owncast (`https://owncast.bowlafterbowl.com`) and IRC (`irc.zeronode.net` / `#BowlAfterBowl`). **Architecture:** Single tokio async binary with concurrent tasks (IRC, Webhook, WebSocket, Health Poller, Router, Control Socket) communicating via mpsc channels. A separate `bridge-ctl` CLI binary talks to the daemon over a Unix socket. **Tech Stack:** Rust, tokio, axum, irc crate, tokio-tungstenite, reqwest, serde/toml, tracing, clap **Design doc:** `docs/plans/2026-03-10-owncast-irc-bridge-design.md` --- ### Task 1: Project Scaffolding **Files:** - Create: `Cargo.toml` - Create: `src/main.rs` - Create: `src/bin/bridge_ctl.rs` - Create: `config.example.toml` - Create: `.gitignore` **Step 1: Initialize Cargo project** Run: ```bash cargo init --name owncast-irc-bridge ``` **Step 2: Set up Cargo.toml with all dependencies** Replace `Cargo.toml` with: ```toml [package] name = "owncast-irc-bridge" version = "0.1.0" edition = "2021" [[bin]] name = "owncast-irc-bridge" path = "src/main.rs" [[bin]] name = "bridge-ctl" path = "src/bin/bridge_ctl.rs" [dependencies] tokio = { version = "1", features = ["full"] } axum = "0.8" irc = "1" tokio-tungstenite = { version = "0.26", features = ["native-tls"] } reqwest = { version = "0.12", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_json = "1" toml = "0.8" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } clap = { version = "4", features = ["derive"] } thiserror = "2" anyhow = "1" futures-util = "0.3" [dev-dependencies] tokio-test = "0.4" ``` **Step 3: Create `.gitignore`** ```gitignore /target .env config.toml ``` **Step 4: Create `config.example.toml`** ```toml [irc] server = "irc.zeronode.net" port = 6667 tls = false nick = "owncast-bridge" channel = "#BowlAfterBowl" [owncast] url = "https://owncast.bowlafterbowl.com" # Set OWNCAST_ACCESS_TOKEN env var for the token webhook_port = 9078 websocket_enabled = true health_poll_interval_secs = 30 [bridge] irc_prefix = "[IRC]" owncast_prefix = "[OC]" message_buffer_size = 0 [control] socket_path = "/tmp/owncast-irc-bridge.sock" ``` **Step 5: Create placeholder `src/main.rs`** ```rust fn main() { println!("owncast-irc-bridge"); } ``` **Step 6: Create placeholder `src/bin/bridge_ctl.rs`** ```rust fn main() { println!("bridge-ctl"); } ``` **Step 7: Verify it compiles** Run: `cargo check` Expected: compiles with no errors (warnings about unused deps are fine) **Step 8: Commit** ```bash git init git add -A git commit -m "feat: scaffold project with dependencies and config" ``` --- ### Task 2: Config Module **Files:** - Create: `src/config.rs` - Modify: `src/main.rs` (add `mod config;`) **Step 1: Write tests for config loading** At the bottom of `src/config.rs`: ```rust #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_minimal_config() { let toml_str = r#" [irc] server = "irc.example.com" channel = "#test" [owncast] url = "https://owncast.example.com" "#; let config: BridgeConfig = toml::from_str(toml_str).unwrap(); assert_eq!(config.irc.server, "irc.example.com"); assert_eq!(config.irc.port, 6667); assert_eq!(config.irc.tls, false); assert_eq!(config.irc.nick, "owncast-bridge"); assert_eq!(config.irc.channel, "#test"); assert_eq!(config.owncast.url, "https://owncast.example.com"); assert_eq!(config.owncast.webhook_port, 9078); assert_eq!(config.owncast.websocket_enabled, false); assert_eq!(config.owncast.health_poll_interval_secs, 30); assert_eq!(config.bridge.irc_prefix, "[IRC]"); assert_eq!(config.bridge.owncast_prefix, "[OC]"); assert_eq!(config.bridge.message_buffer_size, 0); assert_eq!(config.control.socket_path, "/tmp/owncast-irc-bridge.sock"); } #[test] fn test_parse_full_config() { let toml_str = r#" [irc] server = "irc.example.com" port = 6697 tls = true nick = "mybot" channel = "#mychan" [owncast] url = "https://oc.example.com" webhook_port = 8888 websocket_enabled = true health_poll_interval_secs = 10 [bridge] irc_prefix = "" owncast_prefix = "" message_buffer_size = 50 [control] socket_path = "/var/run/bridge.sock" "#; let config: BridgeConfig = toml::from_str(toml_str).unwrap(); assert_eq!(config.irc.port, 6697); assert!(config.irc.tls); assert_eq!(config.irc.nick, "mybot"); assert_eq!(config.owncast.webhook_port, 8888); assert!(config.owncast.websocket_enabled); assert_eq!(config.bridge.message_buffer_size, 50); assert_eq!(config.control.socket_path, "/var/run/bridge.sock"); } #[test] fn test_access_token_from_env() { std::env::set_var("OWNCAST_ACCESS_TOKEN", "test-token-123"); let config = BridgeConfig::default_for_test(); let token = config.owncast_access_token(); assert_eq!(token.unwrap(), "test-token-123"); std::env::remove_var("OWNCAST_ACCESS_TOKEN"); } } ``` **Step 2: Run tests to verify they fail** Run: `cargo test --lib config` Expected: FAIL — `BridgeConfig` not defined **Step 3: Implement config structs** Write `src/config.rs`: ```rust use std::path::Path; use serde::Deserialize; #[derive(Debug, Deserialize)] pub struct BridgeConfig { pub irc: IrcConfig, pub owncast: OwncastConfig, #[serde(default)] pub bridge: BridgeSettings, #[serde(default)] pub control: ControlConfig, } #[derive(Debug, Deserialize)] pub struct IrcConfig { pub server: String, #[serde(default = "default_irc_port")] pub port: u16, #[serde(default)] pub tls: bool, #[serde(default = "default_nick")] pub nick: String, pub channel: String, } #[derive(Debug, Deserialize)] pub struct OwncastConfig { pub url: String, #[serde(default = "default_webhook_port")] pub webhook_port: u16, #[serde(default)] pub websocket_enabled: bool, #[serde(default = "default_health_poll_interval")] pub health_poll_interval_secs: u64, } #[derive(Debug, Deserialize)] pub struct BridgeSettings { #[serde(default = "default_irc_prefix")] pub irc_prefix: String, #[serde(default = "default_owncast_prefix")] pub owncast_prefix: String, #[serde(default)] pub message_buffer_size: usize, } #[derive(Debug, Deserialize)] pub struct ControlConfig { #[serde(default = "default_socket_path")] pub socket_path: String, } impl BridgeConfig { pub fn load(path: &Path) -> anyhow::Result { let contents = std::fs::read_to_string(path)?; let config: BridgeConfig = toml::from_str(&contents)?; Ok(config) } pub fn owncast_access_token(&self) -> anyhow::Result { std::env::var("OWNCAST_ACCESS_TOKEN") .map_err(|_| anyhow::anyhow!("OWNCAST_ACCESS_TOKEN env var not set")) } } fn default_irc_port() -> u16 { 6667 } fn default_nick() -> String { "owncast-bridge".to_string() } fn default_webhook_port() -> u16 { 9078 } fn default_health_poll_interval() -> u64 { 30 } fn default_irc_prefix() -> String { "[IRC]".to_string() } fn default_owncast_prefix() -> String { "[OC]".to_string() } fn default_socket_path() -> String { "/tmp/owncast-irc-bridge.sock".to_string() } impl Default for BridgeSettings { fn default() -> Self { Self { irc_prefix: default_irc_prefix(), owncast_prefix: default_owncast_prefix(), message_buffer_size: 0, } } } impl Default for ControlConfig { fn default() -> Self { Self { socket_path: default_socket_path() } } } #[cfg(test)] impl BridgeConfig { pub fn default_for_test() -> Self { Self { irc: IrcConfig { server: "localhost".to_string(), port: 6667, tls: false, nick: "test-bot".to_string(), channel: "#test".to_string(), }, owncast: OwncastConfig { url: "http://localhost:8080".to_string(), webhook_port: 9078, websocket_enabled: false, health_poll_interval_secs: 30, }, bridge: BridgeSettings::default(), control: ControlConfig::default(), } } } ``` **Step 4: Add module to main.rs** Add `mod config;` to `src/main.rs`. **Step 5: Run tests to verify they pass** Run: `cargo test --lib config` Expected: 3 tests PASS **Step 6: Commit** ```bash git add -A git commit -m "feat: add config module with TOML parsing and env var support" ``` --- ### Task 3: Events Module (Core Types) **Files:** - Create: `src/events.rs` - Modify: `src/main.rs` (add `mod events;`) **Step 1: Write `src/events.rs`** ```rust use tokio::sync::oneshot; #[derive(Debug, Clone, PartialEq, Eq)] pub enum Source { Irc, Owncast, } #[derive(Debug, Clone)] pub enum BridgeEvent { ChatMessage { source: Source, username: String, body: String, id: Option, }, StreamStarted { title: String, }, StreamStopped, } #[derive(Debug, Clone, PartialEq, Eq)] pub enum OwncastState { Online, OfflineChatOpen, Unavailable, } #[derive(Debug)] pub enum ControlCommand { IrcConnect, IrcDisconnect, IrcReconnect, OwncastConnect, OwncastDisconnect, OwncastReconnect, Status { reply: oneshot::Sender, }, Quit, } #[derive(Debug, Clone, serde::Serialize)] pub struct BridgeStatus { pub irc_connected: bool, pub owncast_state: String, pub webhook_listening: bool, pub websocket_connected: bool, pub uptime_secs: u64, } ``` **Step 2: Add module to `src/main.rs`** Add `mod events;`. **Step 3: Verify it compiles** Run: `cargo check` Expected: compiles **Step 4: Commit** ```bash git add -A git commit -m "feat: add events module with BridgeEvent, ControlCommand, OwncastState types" ``` --- ### Task 4: HTML Stripping Utility **Files:** - Create: `src/html.rs` - Modify: `src/main.rs` (add `mod html;`) **Step 1: Write tests** At the bottom of `src/html.rs`: ```rust #[cfg(test)] mod tests { use super::*; #[test] fn test_plain_text_unchanged() { assert_eq!(strip_html("hello world"), "hello world"); } #[test] fn test_strips_basic_tags() { assert_eq!(strip_html("bold text"), "bold text"); } #[test] fn test_emoji_img_to_alt_text() { let input = r#"hello :beerparrot: world"#; assert_eq!(strip_html(input), "hello :beerparrot: world"); } #[test] fn test_multiple_emoji() { let input = r#":a::b:"#; assert_eq!(strip_html(input), ":a::b:"); } #[test] fn test_strips_links() { let input = r#"check this link"#; assert_eq!(strip_html(input), "check this link"); } #[test] fn test_decodes_html_entities() { assert_eq!(strip_html("a & b < c"), "a & b < c"); } } ``` **Step 2: Run tests to verify they fail** Run: `cargo test --lib html` Expected: FAIL **Step 3: Implement `strip_html`** ```rust use std::borrow::Cow; /// Extracts alt text from tags (for Owncast emoji) and strips all other HTML. pub fn strip_html(input: &str) -> String { let mut result = String::with_capacity(input.len()); let mut chars = input.chars().peekable(); while let Some(&ch) = chars.peek() { if ch == '<' { let tag: String = chars.by_ref().take_while(|&c| c != '>').collect(); if tag.starts_with("img ") || tag.starts_with("img\t") { if let Some(alt) = extract_attr(&tag, "alt") { result.push_str(&alt); } } } else if ch == '&' { let entity: String = std::iter::once(ch) .chain(chars.by_ref().take_while(|&c| c != ';')) .collect(); result.push_str(&decode_entity(&entity)); } else { result.push(ch); chars.next(); } } result } fn extract_attr(tag: &str, attr_name: &str) -> Option { let pattern = format!("{}=\"", attr_name); let start = tag.find(&pattern)? + pattern.len(); let rest = &tag[start..]; let end = rest.find('"')?; Some(rest[..end].to_string()) } fn decode_entity(entity: &str) -> Cow<'static, str> { match entity { "&" => Cow::Borrowed("&"), "<" => Cow::Borrowed("<"), ">" => Cow::Borrowed(">"), """ => Cow::Borrowed("\""), "'" | "&apos" => Cow::Borrowed("'"), " " => Cow::Borrowed(" "), other => Cow::Owned(other.to_string()), } } ``` **Step 4: Run tests** Run: `cargo test --lib html` Expected: all PASS **Step 5: Commit** ```bash git add -A git commit -m "feat: add HTML stripping utility for Owncast emoji and markup" ``` --- ### Task 5: Owncast API Sender **Files:** - Create: `src/owncast_api.rs` - Modify: `src/main.rs` (add `mod owncast_api;`) **Step 1: Implement the Owncast API client** ```rust use reqwest::Client; use tracing::{error, warn}; pub struct OwncastApiClient { client: Client, base_url: String, access_token: String, } impl OwncastApiClient { pub fn new(base_url: String, access_token: String) -> Self { Self { client: Client::new(), base_url: base_url.trim_end_matches('/').to_string(), access_token, } } pub async fn send_chat_message(&self, body: &str) -> anyhow::Result<()> { let url = format!("{}/api/integrations/chat/send", self.base_url); let resp = self .client .post(&url) .bearer_auth(&self.access_token) .json(&serde_json::json!({ "body": body })) .send() .await?; let status = resp.status(); if status.is_success() { return Ok(()); } let resp_body = resp.text().await.unwrap_or_default(); if status.is_client_error() { error!(status = %status, body = %resp_body, "Owncast API client error (not retrying)"); anyhow::bail!("Owncast API returned {status}"); } warn!(status = %status, body = %resp_body, "Owncast API server error, retrying once"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; let retry_resp = self .client .post(&url) .bearer_auth(&self.access_token) .json(&serde_json::json!({ "body": body })) .send() .await?; if retry_resp.status().is_success() { Ok(()) } else { let retry_body = retry_resp.text().await.unwrap_or_default(); error!(status = %retry_resp.status(), body = %retry_body, "Owncast API retry failed"); anyhow::bail!("Owncast API retry returned {}", retry_resp.status()) } } pub async fn get_status(&self) -> anyhow::Result { let url = format!("{}/api/status", self.base_url); let resp = self.client.get(&url).send().await?; let status: OwncastStatus = resp.json().await?; Ok(status) } } #[derive(Debug, serde::Deserialize)] pub struct OwncastStatus { pub online: bool, #[serde(default)] #[serde(rename = "streamTitle")] pub stream_title: Option, #[serde(default)] #[serde(rename = "viewerCount")] pub viewer_count: Option, } ``` **Step 2: Add module and verify compilation** Run: `cargo check` Expected: compiles **Step 3: Commit** ```bash git add -A git commit -m "feat: add Owncast API client for sending messages and checking status" ``` --- ### Task 6: Health Poller **Files:** - Create: `src/health.rs` - Modify: `src/main.rs` (add `mod health;`) **Step 1: Implement health poller** ```rust use std::time::Duration; use tokio::sync::mpsc; use tracing::{info, warn}; use crate::events::OwncastState; use crate::owncast_api::OwncastApiClient; pub async fn run_health_poller( api_client: &OwncastApiClient, state_tx: mpsc::Sender, interval: Duration, mut shutdown: tokio::sync::watch::Receiver, ) { let mut current_state = OwncastState::Unavailable; loop { tokio::select! { _ = tokio::time::sleep(interval) => {}, _ = shutdown.changed() => { info!("Health poller shutting down"); return; } } let new_state = match api_client.get_status().await { Ok(status) => { if status.online { OwncastState::Online } else { OwncastState::OfflineChatOpen } } Err(e) => { warn!(error = %e, "Failed to poll Owncast status"); OwncastState::Unavailable } }; if new_state != current_state { info!(old = ?current_state, new = ?new_state, "Owncast state changed"); current_state = new_state.clone(); if state_tx.send(new_state).await.is_err() { return; } } } } ``` **Step 2: Add module and verify compilation** Run: `cargo check` Expected: compiles **Step 3: Commit** ```bash git add -A git commit -m "feat: add Owncast health poller with state change detection" ``` --- ### Task 7: Webhook HTTP Server **Files:** - Create: `src/webhook.rs` - Modify: `src/main.rs` (add `mod webhook;`) **Step 1: Write tests for webhook payload parsing** At the bottom of `src/webhook.rs`: ```rust #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_chat_event() { let json = serde_json::json!({ "type": "CHAT", "eventData": { "user": { "displayName": "viewer42", "isBot": false }, "body": "hello world", "id": "abc123", "visible": true } }); let payload: WebhookPayload = serde_json::from_value(json).unwrap(); let event = payload.into_bridge_event(); assert!(event.is_some()); if let Some(BridgeEvent::ChatMessage { source, username, body, id }) = event { assert_eq!(source, Source::Owncast); assert_eq!(username, "viewer42"); assert_eq!(body, "hello world"); assert_eq!(id, Some("abc123".to_string())); } else { panic!("Expected ChatMessage"); } } #[test] fn test_parse_stream_started() { let json = serde_json::json!({ "type": "STREAM_STARTED", "eventData": { "id": "x", "name": "Test", "streamTitle": "Friday bowls", "summary": "", "timestamp": "2026-01-01T00:00:00Z" } }); let payload: WebhookPayload = serde_json::from_value(json).unwrap(); let event = payload.into_bridge_event(); assert!(matches!(event, Some(BridgeEvent::StreamStarted { title }) if title == "Friday bowls")); } #[test] fn test_parse_stream_stopped() { let json = serde_json::json!({ "type": "STREAM_STOPPED", "eventData": { "id": "x", "name": "Test", "streamTitle": "", "summary": "", "timestamp": "2026-01-01T00:00:00Z" } }); let payload: WebhookPayload = serde_json::from_value(json).unwrap(); let event = payload.into_bridge_event(); assert!(matches!(event, Some(BridgeEvent::StreamStopped))); } #[test] fn test_ignores_bot_messages() { let json = serde_json::json!({ "type": "CHAT", "eventData": { "user": { "displayName": "bot", "isBot": true }, "body": "automated message", "id": "x", "visible": true } }); let payload: WebhookPayload = serde_json::from_value(json).unwrap(); let event = payload.into_bridge_event(); assert!(event.is_none()); } #[test] fn test_ignores_unknown_event_type() { let json = serde_json::json!({ "type": "USER_JOINED", "eventData": { "id": "x" } }); let payload: WebhookPayload = serde_json::from_value(json).unwrap(); let event = payload.into_bridge_event(); assert!(event.is_none()); } } ``` **Step 2: Run tests to verify they fail** Run: `cargo test --lib webhook` Expected: FAIL **Step 3: Implement webhook server** ```rust use axum::{extract::State, http::StatusCode, routing::post, Json, Router}; use serde::Deserialize; use tokio::sync::mpsc; use tracing::{info, warn}; use crate::events::{BridgeEvent, Source}; use crate::html::strip_html; #[derive(Debug, Deserialize)] pub struct WebhookPayload { #[serde(rename = "type")] pub event_type: String, #[serde(rename = "eventData")] pub event_data: serde_json::Value, } #[derive(Debug, Deserialize)] struct ChatEventData { user: ChatUser, body: String, id: String, #[serde(default = "default_visible")] visible: bool, } fn default_visible() -> bool { true } #[derive(Debug, Deserialize)] struct ChatUser { #[serde(rename = "displayName")] display_name: String, #[serde(default, rename = "isBot")] is_bot: bool, } #[derive(Debug, Deserialize)] struct StreamEventData { #[serde(default, rename = "streamTitle")] stream_title: Option, } impl WebhookPayload { pub fn into_bridge_event(self) -> Option { match self.event_type.as_str() { "CHAT" => { let data: ChatEventData = serde_json::from_value(self.event_data).ok()?; if data.user.is_bot || !data.visible { return None; } Some(BridgeEvent::ChatMessage { source: Source::Owncast, username: data.user.display_name, body: strip_html(&data.body), id: Some(data.id), }) } "STREAM_STARTED" => { let data: StreamEventData = serde_json::from_value(self.event_data).ok()?; Some(BridgeEvent::StreamStarted { title: data.stream_title.unwrap_or_default(), }) } "STREAM_STOPPED" => Some(BridgeEvent::StreamStopped), _ => None, } } } #[derive(Clone)] struct WebhookState { event_tx: mpsc::Sender, } async fn handle_webhook( State(state): State, Json(payload): Json, ) -> StatusCode { info!(event_type = %payload.event_type, "Received webhook"); match payload.into_bridge_event() { Some(event) => { if state.event_tx.send(event).await.is_err() { warn!("Router channel closed"); return StatusCode::INTERNAL_SERVER_ERROR; } StatusCode::OK } None => StatusCode::OK, } } pub async fn run_webhook_server( port: u16, event_tx: mpsc::Sender, ) -> anyhow::Result<()> { let state = WebhookState { event_tx }; let app = Router::new() .route("/webhook", post(handle_webhook)) .with_state(state); let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); info!(%addr, "Starting webhook server"); let listener = tokio::net::TcpListener::bind(addr).await?; axum::serve(listener, app).await?; Ok(()) } ``` **Step 4: Run tests** Run: `cargo test --lib webhook` Expected: all PASS **Step 5: Commit** ```bash git add -A git commit -m "feat: add webhook server with payload parsing and chat/stream event handling" ``` --- ### Task 8: IRC Task **Files:** - Create: `src/irc_task.rs` - Modify: `src/main.rs` (add `mod irc_task;`) **Step 1: Implement IRC task** ```rust use std::time::Duration; use futures_util::StreamExt; use irc::client::prelude::*; use tokio::sync::mpsc; use tracing::{error, info, warn}; use crate::config::IrcConfig; use crate::events::{BridgeEvent, Source}; pub async fn run_irc_task( config: IrcConfig, event_tx: mpsc::Sender, mut outbound_rx: mpsc::Receiver, mut shutdown: tokio::sync::watch::Receiver, ) { let mut backoff = Duration::from_secs(1); let max_backoff = Duration::from_secs(60); loop { info!(server = %config.server, channel = %config.channel, "Connecting to IRC"); match connect_and_run(&config, &event_tx, &mut outbound_rx, &mut shutdown).await { Ok(()) => { info!("IRC task exiting cleanly"); return; } Err(e) => { error!(error = %e, "IRC connection error"); info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff"); tokio::select! { _ = tokio::time::sleep(backoff) => {}, _ = shutdown.changed() => return, } backoff = (backoff * 2).min(max_backoff); } } } } async fn connect_and_run( config: &IrcConfig, event_tx: &mpsc::Sender, outbound_rx: &mut mpsc::Receiver, shutdown: &mut tokio::sync::watch::Receiver, ) -> anyhow::Result<()> { let irc_config = Config { nickname: Some(config.nick.clone()), server: Some(config.server.clone()), port: Some(config.port), use_tls: Some(config.tls), channels: vec![config.channel.clone()], ..Config::default() }; let mut client = Client::from_config(irc_config).await?; client.identify()?; let mut stream = client.stream()?; let sender = client.sender(); info!("IRC connected, waiting for messages"); loop { tokio::select! { msg = stream.next() => { match msg { Some(Ok(message)) => { if let Command::PRIVMSG(ref target, ref text) = message.command { if target == &config.channel { let nick = message.source_nickname().unwrap_or("unknown").to_string(); let event = BridgeEvent::ChatMessage { source: Source::Irc, username: nick, body: text.clone(), id: None, }; if event_tx.send(event).await.is_err() { return Ok(()); } } } } Some(Err(e)) => return Err(e.into()), None => return Err(anyhow::anyhow!("IRC stream ended")), } } Some(text) = outbound_rx.recv() => { sender.send_privmsg(&config.channel, &text)?; } _ = shutdown.changed() => { let _ = sender.send_quit("Bridge shutting down"); return Ok(()); } } } } ``` **Step 2: Add module and verify compilation** Run: `cargo check` Expected: compiles **Step 3: Commit** ```bash git add -A git commit -m "feat: add IRC task with auto-reconnect and backoff" ``` --- ### Task 9: WebSocket Task **Files:** - Create: `src/websocket.rs` - Modify: `src/main.rs` (add `mod websocket;`) **Step 1: Implement WebSocket task** ```rust use std::time::Duration; use futures_util::{SinkExt, StreamExt}; use tokio::sync::mpsc; use tokio_tungstenite::connect_async; use tracing::{error, info, warn}; use crate::events::{BridgeEvent, Source}; use crate::html::strip_html; pub async fn run_websocket_task( owncast_url: String, event_tx: mpsc::Sender, mut shutdown: tokio::sync::watch::Receiver, ) { let mut backoff = Duration::from_secs(1); let max_backoff = Duration::from_secs(60); loop { let ws_url = build_ws_url(&owncast_url); info!(url = %ws_url, "Connecting to Owncast WebSocket"); match connect_and_listen(&ws_url, &event_tx, &mut shutdown).await { Ok(()) => { info!("WebSocket task exiting cleanly"); return; } Err(e) => { warn!(error = %e, "WebSocket connection error"); info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff"); tokio::select! { _ = tokio::time::sleep(backoff) => {}, _ = shutdown.changed() => return, } backoff = (backoff * 2).min(max_backoff); } } } } fn build_ws_url(base_url: &str) -> String { let base = base_url.trim_end_matches('/'); let ws_base = if base.starts_with("https://") { base.replacen("https://", "wss://", 1) } else { base.replacen("http://", "ws://", 1) }; format!("{}/ws", ws_base) } async fn connect_and_listen( ws_url: &str, event_tx: &mpsc::Sender, shutdown: &mut tokio::sync::watch::Receiver, ) -> anyhow::Result<()> { let (ws_stream, _) = connect_async(ws_url).await?; let (mut _write, mut read) = ws_stream.split(); info!("WebSocket connected"); loop { tokio::select! { msg = read.next() => { match msg { Some(Ok(ws_msg)) => { if let Ok(text) = ws_msg.into_text() { if let Some(event) = parse_ws_message(&text) { if event_tx.send(event).await.is_err() { return Ok(()); } } } } Some(Err(e)) => return Err(e.into()), None => return Err(anyhow::anyhow!("WebSocket stream ended")), } } _ = shutdown.changed() => return Ok(()), } } } fn parse_ws_message(text: &str) -> Option { let value: serde_json::Value = serde_json::from_str(text).ok()?; let msg_type = value.get("type")?.as_str()?; match msg_type { "CHAT" => { let user = value.get("user")?; let is_bot = user.get("isBot").and_then(|v| v.as_bool()).unwrap_or(false); if is_bot { return None; } let display_name = user.get("displayName")?.as_str()?.to_string(); let body = value.get("body")?.as_str()?; let id = value.get("id").and_then(|v| v.as_str()).map(String::from); Some(BridgeEvent::ChatMessage { source: Source::Owncast, username: display_name, body: strip_html(body), id, }) } _ => None, } } #[cfg(test)] mod tests { use super::*; #[test] fn test_build_ws_url_https() { assert_eq!( build_ws_url("https://owncast.example.com"), "wss://owncast.example.com/ws" ); } #[test] fn test_build_ws_url_http() { assert_eq!( build_ws_url("http://localhost:8080"), "ws://localhost:8080/ws" ); } #[test] fn test_build_ws_url_trailing_slash() { assert_eq!( build_ws_url("https://owncast.example.com/"), "wss://owncast.example.com/ws" ); } #[test] fn test_parse_ws_chat_message() { let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"viewer","isBot":false}}"#; let event = parse_ws_message(json); assert!(matches!( event, Some(BridgeEvent::ChatMessage { ref username, ref body, .. }) if username == "viewer" && body == "hello" )); } #[test] fn test_parse_ws_bot_message_ignored() { let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"bot","isBot":true}}"#; assert!(parse_ws_message(json).is_none()); } } ``` **Step 2: Add module and verify compilation** Run: `cargo check` Expected: compiles **Step 3: Run tests** Run: `cargo test --lib websocket` Expected: all PASS **Step 4: Commit** ```bash git add -A git commit -m "feat: add WebSocket task for Owncast chat with reconnect" ``` --- ### Task 10: Control Task & Unix Socket **Files:** - Create: `src/control.rs` - Modify: `src/main.rs` (add `mod control;`) **Step 1: Write tests for command parsing** At the bottom of `src/control.rs`: ```rust #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_irc_commands() { assert!(matches!(parse_command("irc connect"), Some(ParsedCommand::IrcConnect))); assert!(matches!(parse_command("irc disconnect"), Some(ParsedCommand::IrcDisconnect))); assert!(matches!(parse_command("irc reconnect"), Some(ParsedCommand::IrcReconnect))); } #[test] fn test_parse_owncast_commands() { assert!(matches!(parse_command("owncast connect"), Some(ParsedCommand::OwncastConnect))); assert!(matches!(parse_command("owncast disconnect"), Some(ParsedCommand::OwncastDisconnect))); assert!(matches!(parse_command("owncast reconnect"), Some(ParsedCommand::OwncastReconnect))); } #[test] fn test_parse_status_and_quit() { assert!(matches!(parse_command("status"), Some(ParsedCommand::Status))); assert!(matches!(parse_command("quit"), Some(ParsedCommand::Quit))); } #[test] fn test_parse_unknown() { assert!(parse_command("unknown command").is_none()); assert!(parse_command("").is_none()); } #[test] fn test_parse_case_insensitive_trimmed() { assert!(matches!(parse_command(" IRC CONNECT "), Some(ParsedCommand::IrcConnect))); assert!(matches!(parse_command("QUIT\n"), Some(ParsedCommand::Quit))); } } ``` **Step 2: Run tests to verify they fail** Run: `cargo test --lib control` Expected: FAIL **Step 3: Implement control socket listener** ```rust use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixListener; use tokio::sync::{mpsc, oneshot}; use tracing::{error, info, warn}; use crate::events::{BridgeStatus, ControlCommand}; #[derive(Debug, PartialEq)] enum ParsedCommand { IrcConnect, IrcDisconnect, IrcReconnect, OwncastConnect, OwncastDisconnect, OwncastReconnect, Status, Quit, } fn parse_command(input: &str) -> Option { let trimmed = input.trim().to_lowercase(); match trimmed.as_str() { "irc connect" => Some(ParsedCommand::IrcConnect), "irc disconnect" => Some(ParsedCommand::IrcDisconnect), "irc reconnect" => Some(ParsedCommand::IrcReconnect), "owncast connect" => Some(ParsedCommand::OwncastConnect), "owncast disconnect" => Some(ParsedCommand::OwncastDisconnect), "owncast reconnect" => Some(ParsedCommand::OwncastReconnect), "status" => Some(ParsedCommand::Status), "quit" => Some(ParsedCommand::Quit), _ => None, } } pub async fn run_control_socket( socket_path: &str, control_tx: mpsc::Sender, ) -> anyhow::Result<()> { let _ = std::fs::remove_file(socket_path); let listener = UnixListener::bind(socket_path)?; info!(path = %socket_path, "Control socket listening"); loop { let (stream, _) = listener.accept().await?; let control_tx = control_tx.clone(); tokio::spawn(async move { let (reader, mut writer) = stream.into_split(); let mut reader = BufReader::new(reader); let mut line = String::new(); if reader.read_line(&mut line).await.is_err() { return; } match parse_command(&line) { Some(ParsedCommand::Status) => { let (reply_tx, reply_rx) = oneshot::channel(); let cmd = ControlCommand::Status { reply: reply_tx }; if control_tx.send(cmd).await.is_ok() { if let Ok(status) = reply_rx.await { let json = serde_json::to_string_pretty(&status).unwrap_or_default(); let _ = writer.write_all(json.as_bytes()).await; let _ = writer.write_all(b"\n").await; } } } Some(parsed) => { let cmd = match parsed { ParsedCommand::IrcConnect => ControlCommand::IrcConnect, ParsedCommand::IrcDisconnect => ControlCommand::IrcDisconnect, ParsedCommand::IrcReconnect => ControlCommand::IrcReconnect, ParsedCommand::OwncastConnect => ControlCommand::OwncastConnect, ParsedCommand::OwncastDisconnect => ControlCommand::OwncastDisconnect, ParsedCommand::OwncastReconnect => ControlCommand::OwncastReconnect, ParsedCommand::Quit => ControlCommand::Quit, ParsedCommand::Status => unreachable!(), }; if control_tx.send(cmd).await.is_ok() { let _ = writer.write_all(b"OK\n").await; } else { let _ = writer.write_all(b"ERROR: channel closed\n").await; } } None => { let _ = writer.write_all(b"ERROR: unknown command\n").await; } } }); } } ``` **Step 4: Run tests** Run: `cargo test --lib control` Expected: all PASS **Step 5: Commit** ```bash git add -A git commit -m "feat: add Unix socket control interface with command parsing" ``` --- ### Task 11: Router (Central Orchestration) **Files:** - Create: `src/router.rs` - Modify: `src/main.rs` (add `mod router;`) **Step 1: Write tests for echo suppression and dedup** At the bottom of `src/router.rs`: ```rust #[cfg(test)] mod tests { use super::*; #[test] fn test_dedup_tracker_new_id() { let mut tracker = DedupTracker::new(100); assert!(!tracker.is_duplicate("msg-1")); } #[test] fn test_dedup_tracker_duplicate() { let mut tracker = DedupTracker::new(100); assert!(!tracker.is_duplicate("msg-1")); assert!(tracker.is_duplicate("msg-1")); } #[test] fn test_dedup_tracker_evicts_old() { let mut tracker = DedupTracker::new(2); tracker.is_duplicate("a"); tracker.is_duplicate("b"); tracker.is_duplicate("c"); assert!(!tracker.is_duplicate("a")); } #[test] fn test_echo_suppressor() { let mut suppressor = EchoSuppressor::new(10); suppressor.record_sent("hello from IRC"); assert!(suppressor.is_echo("hello from IRC")); assert!(!suppressor.is_echo("different message")); } } ``` **Step 2: Run tests to verify they fail** Run: `cargo test --lib router` Expected: FAIL **Step 3: Implement router** ```rust use std::collections::{HashSet, VecDeque}; use std::time::Instant; use tokio::sync::mpsc; use tracing::{info, warn}; use crate::config::BridgeSettings; use crate::events::{BridgeEvent, ControlCommand, OwncastState, Source}; use crate::owncast_api::OwncastApiClient; pub struct DedupTracker { seen: VecDeque, set: HashSet, capacity: usize, } impl DedupTracker { pub fn new(capacity: usize) -> Self { Self { seen: VecDeque::with_capacity(capacity), set: HashSet::with_capacity(capacity), capacity, } } pub fn is_duplicate(&mut self, id: &str) -> bool { if self.set.contains(id) { return true; } if self.seen.len() >= self.capacity { if let Some(old) = self.seen.pop_front() { self.set.remove(&old); } } self.set.insert(id.to_string()); self.seen.push_back(id.to_string()); false } } pub struct EchoSuppressor { recent: VecDeque, capacity: usize, } impl EchoSuppressor { pub fn new(capacity: usize) -> Self { Self { recent: VecDeque::with_capacity(capacity), capacity, } } pub fn record_sent(&mut self, body: &str) { if self.recent.len() >= self.capacity { self.recent.pop_front(); } self.recent.push_back(body.to_string()); } pub fn is_echo(&mut self, body: &str) -> bool { if let Some(pos) = self.recent.iter().position(|s| s == body) { self.recent.remove(pos); true } else { false } } } pub async fn run_router( settings: BridgeSettings, owncast_url: String, api_client: OwncastApiClient, mut event_rx: mpsc::Receiver, irc_outbound_tx: mpsc::Sender, mut state_rx: mpsc::Receiver, mut control_rx: mpsc::Receiver, shutdown_tx: tokio::sync::watch::Sender, start_time: Instant, ) { let mut dedup = DedupTracker::new(500); let mut echo_suppressor = EchoSuppressor::new(50); let mut owncast_state = OwncastState::Unavailable; loop { tokio::select! { Some(event) = event_rx.recv() => { handle_event( event, &settings, &owncast_url, &api_client, &irc_outbound_tx, &mut dedup, &mut echo_suppressor, &owncast_state, ).await; } Some(new_state) = state_rx.recv() => { if new_state != owncast_state { handle_state_change(&new_state, &owncast_state, &irc_outbound_tx).await; owncast_state = new_state; } } Some(cmd) = control_rx.recv() => { match cmd { ControlCommand::Quit => { info!("Quit command received, shutting down"); let _ = shutdown_tx.send(true); return; } ControlCommand::Status { reply } => { let status = crate::events::BridgeStatus { irc_connected: true, owncast_state: format!("{:?}", owncast_state), webhook_listening: true, websocket_connected: false, uptime_secs: start_time.elapsed().as_secs(), }; let _ = reply.send(status); } other => { warn!(command = ?other, "Control command not yet implemented in router"); } } } else => break, } } } async fn handle_event( event: BridgeEvent, settings: &BridgeSettings, owncast_url: &str, api_client: &OwncastApiClient, irc_tx: &mpsc::Sender, dedup: &mut DedupTracker, echo: &mut EchoSuppressor, owncast_state: &OwncastState, ) { match event { BridgeEvent::ChatMessage { source, username, body, id } => { if let Some(ref msg_id) = id { if dedup.is_duplicate(msg_id) { return; } } match source { Source::Irc => { if *owncast_state == OwncastState::Unavailable { return; } let formatted = format!("{} <{}> {}", settings.irc_prefix, username, body); echo.record_sent(&formatted); if let Err(e) = api_client.send_chat_message(&formatted).await { warn!(error = %e, "Failed to send to Owncast"); } } Source::Owncast => { let formatted = format!("{} <{}> {}", settings.owncast_prefix, username, body); if echo.is_echo(&body) { return; } if irc_tx.send(formatted).await.is_err() { warn!("IRC outbound channel closed"); } } } } BridgeEvent::StreamStarted { title } => { let msg = if title.is_empty() { format!("Stream started — {}", owncast_url) } else { format!("Stream started: {} — {}", title, owncast_url) }; let _ = irc_tx.send(msg).await; } BridgeEvent::StreamStopped => { let _ = irc_tx.send("Stream ended.".to_string()).await; } } } async fn handle_state_change( new: &OwncastState, _old: &OwncastState, irc_tx: &mpsc::Sender, ) { match new { OwncastState::Online => { let _ = irc_tx.send("Owncast chat is now available.".to_string()).await; } OwncastState::Unavailable => { let _ = irc_tx.send("Owncast chat is currently unavailable.".to_string()).await; } OwncastState::OfflineChatOpen => {} } } ``` **Step 4: Run tests** Run: `cargo test --lib router` Expected: all PASS **Step 5: Commit** ```bash git add -A git commit -m "feat: add router with dedup, echo suppression, and state handling" ``` --- ### Task 12: Main Entry Point (Wire Everything Together) **Files:** - Modify: `src/main.rs` **Step 1: Implement main** ```rust mod config; mod control; mod events; mod health; mod html; mod irc_task; mod owncast_api; mod router; mod webhook; mod websocket; use std::path::PathBuf; use std::time::Instant; use clap::Parser; use tokio::sync::{mpsc, watch}; use tracing::info; #[derive(Parser)] #[command(name = "owncast-irc-bridge")] #[command(about = "Bidirectional chat bridge between Owncast and IRC")] struct Cli { /// Path to config file #[arg(short, long, default_value = "config.toml")] config: PathBuf, } #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let cli = Cli::parse(); let config = config::BridgeConfig::load(&cli.config)?; let access_token = config.owncast_access_token()?; info!("Starting owncast-irc-bridge"); let start_time = Instant::now(); let (shutdown_tx, shutdown_rx) = watch::channel(false); let (event_tx, event_rx) = mpsc::channel(256); let (irc_outbound_tx, irc_outbound_rx) = mpsc::channel(256); let (state_tx, state_rx) = mpsc::channel(32); let (control_tx, control_rx) = mpsc::channel(32); let api_client = owncast_api::OwncastApiClient::new( config.owncast.url.clone(), access_token, ); // IRC task let irc_config = config.irc.clone(); let irc_event_tx = event_tx.clone(); let irc_shutdown = shutdown_rx.clone(); let irc_handle = tokio::spawn(async move { irc_task::run_irc_task(irc_config, irc_event_tx, irc_outbound_rx, irc_shutdown).await; }); // Webhook server let webhook_port = config.owncast.webhook_port; let webhook_event_tx = event_tx.clone(); let webhook_handle = tokio::spawn(async move { if let Err(e) = webhook::run_webhook_server(webhook_port, webhook_event_tx).await { tracing::error!(error = %e, "Webhook server failed"); } }); // WebSocket task (optional) let ws_handle = if config.owncast.websocket_enabled { let ws_url = config.owncast.url.clone(); let ws_event_tx = event_tx.clone(); let ws_shutdown = shutdown_rx.clone(); Some(tokio::spawn(async move { websocket::run_websocket_task(ws_url, ws_event_tx, ws_shutdown).await; })) } else { None }; // Health poller let health_api = owncast_api::OwncastApiClient::new( config.owncast.url.clone(), String::new(), ); let health_interval = std::time::Duration::from_secs(config.owncast.health_poll_interval_secs); let health_shutdown = shutdown_rx.clone(); let health_handle = tokio::spawn(async move { health::run_health_poller(&health_api, state_tx, health_interval, health_shutdown).await; }); // Control socket let control_socket_path = config.control.socket_path.clone(); let control_handle = tokio::spawn(async move { if let Err(e) = control::run_control_socket(&control_socket_path, control_tx).await { tracing::error!(error = %e, "Control socket failed"); } }); // Signal handling let sig_shutdown_tx = shutdown_tx.clone(); tokio::spawn(async move { let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("Failed to register SIGTERM handler"); let mut sighup = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup()) .expect("Failed to register SIGHUP handler"); tokio::select! { _ = tokio::signal::ctrl_c() => { info!("SIGINT received, shutting down"); let _ = sig_shutdown_tx.send(true); } _ = sigterm.recv() => { info!("SIGTERM received, shutting down"); let _ = sig_shutdown_tx.send(true); } _ = sighup.recv() => { info!("SIGHUP received (reconnect not yet wired)"); } } }); // Router (runs on main task) router::run_router( config.bridge, config.owncast.url, api_client, event_rx, irc_outbound_tx, state_rx, control_rx, shutdown_tx, start_time, ) .await; info!("Bridge shutting down"); Ok(()) } ``` **Step 2: Verify it compiles** Run: `cargo check` Expected: compiles. Fix any issues with `Clone` derives or visibility. **Step 3: Commit** ```bash git add -A git commit -m "feat: wire all tasks together in main with signal handling" ``` --- ### Task 13: bridge-ctl CLI **Files:** - Modify: `src/bin/bridge_ctl.rs` **Step 1: Implement bridge-ctl** ```rust use std::io::{BufRead, BufReader, Write}; use std::os::unix::net::UnixStream; use std::path::PathBuf; use std::time::Duration; use clap::{Parser, Subcommand}; #[derive(Parser)] #[command(name = "bridge-ctl")] #[command(about = "Control a running owncast-irc-bridge instance")] struct Cli { /// Path to the bridge control socket #[arg(short, long, default_value = "/tmp/owncast-irc-bridge.sock")] socket: PathBuf, #[command(subcommand)] command: Commands, } #[derive(Subcommand)] enum Commands { /// Show bridge status Status, /// Control IRC connection Irc { #[command(subcommand)] action: ConnectionAction, }, /// Control Owncast connection Owncast { #[command(subcommand)] action: ConnectionAction, }, /// Shut down the bridge Quit, } #[derive(Subcommand)] enum ConnectionAction { Connect, Disconnect, Reconnect, } fn main() { let cli = Cli::parse(); let command_str = match &cli.command { Commands::Status => "status".to_string(), Commands::Quit => "quit".to_string(), Commands::Irc { action } => format!("irc {}", action_str(action)), Commands::Owncast { action } => format!("owncast {}", action_str(action)), }; match send_command(&cli.socket, &command_str) { Ok(response) => print!("{response}"), Err(e) => { eprintln!("Error: {e}"); std::process::exit(1); } } } fn action_str(action: &ConnectionAction) -> &'static str { match action { ConnectionAction::Connect => "connect", ConnectionAction::Disconnect => "disconnect", ConnectionAction::Reconnect => "reconnect", } } fn send_command(socket_path: &PathBuf, command: &str) -> Result> { let mut stream = UnixStream::connect(socket_path)?; stream.set_read_timeout(Some(Duration::from_secs(5)))?; writeln!(stream, "{command}")?; stream.flush()?; let reader = BufReader::new(stream); let mut response = String::new(); for line in reader.lines() { match line { Ok(l) => { response.push_str(&l); response.push('\n'); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => break, Err(e) => return Err(e.into()), } } Ok(response) } ``` **Step 2: Verify it compiles** Run: `cargo check --bin bridge-ctl` Expected: compiles **Step 3: Commit** ```bash git add -A git commit -m "feat: add bridge-ctl CLI for runtime control" ``` --- ### Task 14: Dockerfile **Files:** - Create: `Dockerfile` **Step 1: Write Dockerfile** ```dockerfile FROM rust:1.85-slim-bookworm AS builder RUN apt-get update && apt-get install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/* WORKDIR /app COPY Cargo.toml Cargo.lock ./ COPY src/ src/ RUN cargo build --release FROM debian:bookworm-slim RUN apt-get update && apt-get install -y ca-certificates libssl3 && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/owncast-irc-bridge /usr/local/bin/ COPY --from=builder /app/target/release/bridge-ctl /usr/local/bin/ ENTRYPOINT ["owncast-irc-bridge"] CMD ["--config", "/etc/owncast-irc-bridge/config.toml"] ``` **Step 2: Commit** ```bash git add -A git commit -m "feat: add multi-stage Dockerfile" ``` --- ### Task 15: Add Clone derives and final compilation fix At this point, do a full `cargo build` and fix any missing trait derives (e.g. `Clone` on `IrcConfig`), visibility issues, or type mismatches that appear. Ensure both binaries compile. **Step 1: Full build** Run: `cargo build` Expected: compiles with both binaries **Step 2: Run all tests** Run: `cargo test` Expected: all tests pass **Step 3: Commit any fixes** ```bash git add -A git commit -m "fix: add missing derives and resolve compilation issues" ```