2059 lines
54 KiB
Markdown
2059 lines
54 KiB
Markdown
|
|
# Owncast–IRC Bridge Implementation Plan
|
|||
|
|
|
|||
|
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
|||
|
|
|
|||
|
|
**Goal:** Build a bidirectional Rust chat bridge between Owncast (`https://owncast.bowlafterbowl.com`) and IRC (`irc.zeronode.net` / `#BowlAfterBowl`).
|
|||
|
|
|
|||
|
|
**Architecture:** Single tokio async binary with concurrent tasks (IRC, Webhook, WebSocket, Health Poller, Router, Control Socket) communicating via mpsc channels. A separate `bridge-ctl` CLI binary talks to the daemon over a Unix socket.
|
|||
|
|
|
|||
|
|
**Tech Stack:** Rust, tokio, axum, irc crate, tokio-tungstenite, reqwest, serde/toml, tracing, clap
|
|||
|
|
|
|||
|
|
**Design doc:** `docs/plans/2026-03-10-owncast-irc-bridge-design.md`
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 1: Project Scaffolding
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `Cargo.toml`
|
|||
|
|
- Create: `src/main.rs`
|
|||
|
|
- Create: `src/bin/bridge_ctl.rs`
|
|||
|
|
- Create: `config.example.toml`
|
|||
|
|
- Create: `.gitignore`
|
|||
|
|
|
|||
|
|
**Step 1: Initialize Cargo project**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
```bash
|
|||
|
|
cargo init --name owncast-irc-bridge
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Set up Cargo.toml with all dependencies**
|
|||
|
|
|
|||
|
|
Replace `Cargo.toml` with:
|
|||
|
|
|
|||
|
|
```toml
|
|||
|
|
[package]
|
|||
|
|
name = "owncast-irc-bridge"
|
|||
|
|
version = "0.1.0"
|
|||
|
|
edition = "2021"
|
|||
|
|
|
|||
|
|
[[bin]]
|
|||
|
|
name = "owncast-irc-bridge"
|
|||
|
|
path = "src/main.rs"
|
|||
|
|
|
|||
|
|
[[bin]]
|
|||
|
|
name = "bridge-ctl"
|
|||
|
|
path = "src/bin/bridge_ctl.rs"
|
|||
|
|
|
|||
|
|
[dependencies]
|
|||
|
|
tokio = { version = "1", features = ["full"] }
|
|||
|
|
axum = "0.8"
|
|||
|
|
irc = "1"
|
|||
|
|
tokio-tungstenite = { version = "0.26", features = ["native-tls"] }
|
|||
|
|
reqwest = { version = "0.12", features = ["json"] }
|
|||
|
|
serde = { version = "1", features = ["derive"] }
|
|||
|
|
serde_json = "1"
|
|||
|
|
toml = "0.8"
|
|||
|
|
tracing = "0.1"
|
|||
|
|
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
|||
|
|
clap = { version = "4", features = ["derive"] }
|
|||
|
|
thiserror = "2"
|
|||
|
|
anyhow = "1"
|
|||
|
|
futures-util = "0.3"
|
|||
|
|
|
|||
|
|
[dev-dependencies]
|
|||
|
|
tokio-test = "0.4"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 3: Create `.gitignore`**
|
|||
|
|
|
|||
|
|
```gitignore
|
|||
|
|
/target
|
|||
|
|
.env
|
|||
|
|
config.toml
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 4: Create `config.example.toml`**
|
|||
|
|
|
|||
|
|
```toml
|
|||
|
|
[irc]
|
|||
|
|
server = "irc.zeronode.net"
|
|||
|
|
port = 6667
|
|||
|
|
tls = false
|
|||
|
|
nick = "owncast-bridge"
|
|||
|
|
channel = "#BowlAfterBowl"
|
|||
|
|
|
|||
|
|
[owncast]
|
|||
|
|
url = "https://owncast.bowlafterbowl.com"
|
|||
|
|
# Set OWNCAST_ACCESS_TOKEN env var for the token
|
|||
|
|
webhook_port = 9078
|
|||
|
|
websocket_enabled = true
|
|||
|
|
health_poll_interval_secs = 30
|
|||
|
|
|
|||
|
|
[bridge]
|
|||
|
|
irc_prefix = "[IRC]"
|
|||
|
|
owncast_prefix = "[OC]"
|
|||
|
|
message_buffer_size = 0
|
|||
|
|
|
|||
|
|
[control]
|
|||
|
|
socket_path = "/tmp/owncast-irc-bridge.sock"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 5: Create placeholder `src/main.rs`**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
fn main() {
|
|||
|
|
println!("owncast-irc-bridge");
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 6: Create placeholder `src/bin/bridge_ctl.rs`**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
fn main() {
|
|||
|
|
println!("bridge-ctl");
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 7: Verify it compiles**
|
|||
|
|
|
|||
|
|
Run: `cargo check`
|
|||
|
|
Expected: compiles with no errors (warnings about unused deps are fine)
|
|||
|
|
|
|||
|
|
**Step 8: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git init
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: scaffold project with dependencies and config"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 2: Config Module
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/config.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod config;`)
|
|||
|
|
|
|||
|
|
**Step 1: Write tests for config loading**
|
|||
|
|
|
|||
|
|
At the bottom of `src/config.rs`:
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
#[cfg(test)]
|
|||
|
|
mod tests {
|
|||
|
|
use super::*;
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_minimal_config() {
|
|||
|
|
let toml_str = r#"
|
|||
|
|
[irc]
|
|||
|
|
server = "irc.example.com"
|
|||
|
|
channel = "#test"
|
|||
|
|
|
|||
|
|
[owncast]
|
|||
|
|
url = "https://owncast.example.com"
|
|||
|
|
"#;
|
|||
|
|
let config: BridgeConfig = toml::from_str(toml_str).unwrap();
|
|||
|
|
assert_eq!(config.irc.server, "irc.example.com");
|
|||
|
|
assert_eq!(config.irc.port, 6667);
|
|||
|
|
assert_eq!(config.irc.tls, false);
|
|||
|
|
assert_eq!(config.irc.nick, "owncast-bridge");
|
|||
|
|
assert_eq!(config.irc.channel, "#test");
|
|||
|
|
assert_eq!(config.owncast.url, "https://owncast.example.com");
|
|||
|
|
assert_eq!(config.owncast.webhook_port, 9078);
|
|||
|
|
assert_eq!(config.owncast.websocket_enabled, false);
|
|||
|
|
assert_eq!(config.owncast.health_poll_interval_secs, 30);
|
|||
|
|
assert_eq!(config.bridge.irc_prefix, "[IRC]");
|
|||
|
|
assert_eq!(config.bridge.owncast_prefix, "[OC]");
|
|||
|
|
assert_eq!(config.bridge.message_buffer_size, 0);
|
|||
|
|
assert_eq!(config.control.socket_path, "/tmp/owncast-irc-bridge.sock");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_full_config() {
|
|||
|
|
let toml_str = r#"
|
|||
|
|
[irc]
|
|||
|
|
server = "irc.example.com"
|
|||
|
|
port = 6697
|
|||
|
|
tls = true
|
|||
|
|
nick = "mybot"
|
|||
|
|
channel = "#mychan"
|
|||
|
|
|
|||
|
|
[owncast]
|
|||
|
|
url = "https://oc.example.com"
|
|||
|
|
webhook_port = 8888
|
|||
|
|
websocket_enabled = true
|
|||
|
|
health_poll_interval_secs = 10
|
|||
|
|
|
|||
|
|
[bridge]
|
|||
|
|
irc_prefix = "<IRC>"
|
|||
|
|
owncast_prefix = "<OC>"
|
|||
|
|
message_buffer_size = 50
|
|||
|
|
|
|||
|
|
[control]
|
|||
|
|
socket_path = "/var/run/bridge.sock"
|
|||
|
|
"#;
|
|||
|
|
let config: BridgeConfig = toml::from_str(toml_str).unwrap();
|
|||
|
|
assert_eq!(config.irc.port, 6697);
|
|||
|
|
assert!(config.irc.tls);
|
|||
|
|
assert_eq!(config.irc.nick, "mybot");
|
|||
|
|
assert_eq!(config.owncast.webhook_port, 8888);
|
|||
|
|
assert!(config.owncast.websocket_enabled);
|
|||
|
|
assert_eq!(config.bridge.message_buffer_size, 50);
|
|||
|
|
assert_eq!(config.control.socket_path, "/var/run/bridge.sock");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_access_token_from_env() {
|
|||
|
|
std::env::set_var("OWNCAST_ACCESS_TOKEN", "test-token-123");
|
|||
|
|
let config = BridgeConfig::default_for_test();
|
|||
|
|
let token = config.owncast_access_token();
|
|||
|
|
assert_eq!(token.unwrap(), "test-token-123");
|
|||
|
|
std::env::remove_var("OWNCAST_ACCESS_TOKEN");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Run tests to verify they fail**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib config`
|
|||
|
|
Expected: FAIL — `BridgeConfig` not defined
|
|||
|
|
|
|||
|
|
**Step 3: Implement config structs**
|
|||
|
|
|
|||
|
|
Write `src/config.rs`:
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use std::path::Path;
|
|||
|
|
|
|||
|
|
use serde::Deserialize;
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
pub struct BridgeConfig {
|
|||
|
|
pub irc: IrcConfig,
|
|||
|
|
pub owncast: OwncastConfig,
|
|||
|
|
#[serde(default)]
|
|||
|
|
pub bridge: BridgeSettings,
|
|||
|
|
#[serde(default)]
|
|||
|
|
pub control: ControlConfig,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
pub struct IrcConfig {
|
|||
|
|
pub server: String,
|
|||
|
|
#[serde(default = "default_irc_port")]
|
|||
|
|
pub port: u16,
|
|||
|
|
#[serde(default)]
|
|||
|
|
pub tls: bool,
|
|||
|
|
#[serde(default = "default_nick")]
|
|||
|
|
pub nick: String,
|
|||
|
|
pub channel: String,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
pub struct OwncastConfig {
|
|||
|
|
pub url: String,
|
|||
|
|
#[serde(default = "default_webhook_port")]
|
|||
|
|
pub webhook_port: u16,
|
|||
|
|
#[serde(default)]
|
|||
|
|
pub websocket_enabled: bool,
|
|||
|
|
#[serde(default = "default_health_poll_interval")]
|
|||
|
|
pub health_poll_interval_secs: u64,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
pub struct BridgeSettings {
|
|||
|
|
#[serde(default = "default_irc_prefix")]
|
|||
|
|
pub irc_prefix: String,
|
|||
|
|
#[serde(default = "default_owncast_prefix")]
|
|||
|
|
pub owncast_prefix: String,
|
|||
|
|
#[serde(default)]
|
|||
|
|
pub message_buffer_size: usize,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
pub struct ControlConfig {
|
|||
|
|
#[serde(default = "default_socket_path")]
|
|||
|
|
pub socket_path: String,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl BridgeConfig {
|
|||
|
|
pub fn load(path: &Path) -> anyhow::Result<Self> {
|
|||
|
|
let contents = std::fs::read_to_string(path)?;
|
|||
|
|
let config: BridgeConfig = toml::from_str(&contents)?;
|
|||
|
|
Ok(config)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn owncast_access_token(&self) -> anyhow::Result<String> {
|
|||
|
|
std::env::var("OWNCAST_ACCESS_TOKEN")
|
|||
|
|
.map_err(|_| anyhow::anyhow!("OWNCAST_ACCESS_TOKEN env var not set"))
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn default_irc_port() -> u16 { 6667 }
|
|||
|
|
fn default_nick() -> String { "owncast-bridge".to_string() }
|
|||
|
|
fn default_webhook_port() -> u16 { 9078 }
|
|||
|
|
fn default_health_poll_interval() -> u64 { 30 }
|
|||
|
|
fn default_irc_prefix() -> String { "[IRC]".to_string() }
|
|||
|
|
fn default_owncast_prefix() -> String { "[OC]".to_string() }
|
|||
|
|
fn default_socket_path() -> String { "/tmp/owncast-irc-bridge.sock".to_string() }
|
|||
|
|
|
|||
|
|
impl Default for BridgeSettings {
|
|||
|
|
fn default() -> Self {
|
|||
|
|
Self {
|
|||
|
|
irc_prefix: default_irc_prefix(),
|
|||
|
|
owncast_prefix: default_owncast_prefix(),
|
|||
|
|
message_buffer_size: 0,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl Default for ControlConfig {
|
|||
|
|
fn default() -> Self {
|
|||
|
|
Self { socket_path: default_socket_path() }
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[cfg(test)]
|
|||
|
|
impl BridgeConfig {
|
|||
|
|
pub fn default_for_test() -> Self {
|
|||
|
|
Self {
|
|||
|
|
irc: IrcConfig {
|
|||
|
|
server: "localhost".to_string(),
|
|||
|
|
port: 6667,
|
|||
|
|
tls: false,
|
|||
|
|
nick: "test-bot".to_string(),
|
|||
|
|
channel: "#test".to_string(),
|
|||
|
|
},
|
|||
|
|
owncast: OwncastConfig {
|
|||
|
|
url: "http://localhost:8080".to_string(),
|
|||
|
|
webhook_port: 9078,
|
|||
|
|
websocket_enabled: false,
|
|||
|
|
health_poll_interval_secs: 30,
|
|||
|
|
},
|
|||
|
|
bridge: BridgeSettings::default(),
|
|||
|
|
control: ControlConfig::default(),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 4: Add module to main.rs**
|
|||
|
|
|
|||
|
|
Add `mod config;` to `src/main.rs`.
|
|||
|
|
|
|||
|
|
**Step 5: Run tests to verify they pass**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib config`
|
|||
|
|
Expected: 3 tests PASS
|
|||
|
|
|
|||
|
|
**Step 6: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add config module with TOML parsing and env var support"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 3: Events Module (Core Types)
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/events.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod events;`)
|
|||
|
|
|
|||
|
|
**Step 1: Write `src/events.rs`**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use tokio::sync::oneshot;
|
|||
|
|
|
|||
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|||
|
|
pub enum Source {
|
|||
|
|
Irc,
|
|||
|
|
Owncast,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Clone)]
|
|||
|
|
pub enum BridgeEvent {
|
|||
|
|
ChatMessage {
|
|||
|
|
source: Source,
|
|||
|
|
username: String,
|
|||
|
|
body: String,
|
|||
|
|
id: Option<String>,
|
|||
|
|
},
|
|||
|
|
StreamStarted {
|
|||
|
|
title: String,
|
|||
|
|
},
|
|||
|
|
StreamStopped,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|||
|
|
pub enum OwncastState {
|
|||
|
|
Online,
|
|||
|
|
OfflineChatOpen,
|
|||
|
|
Unavailable,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug)]
|
|||
|
|
pub enum ControlCommand {
|
|||
|
|
IrcConnect,
|
|||
|
|
IrcDisconnect,
|
|||
|
|
IrcReconnect,
|
|||
|
|
OwncastConnect,
|
|||
|
|
OwncastDisconnect,
|
|||
|
|
OwncastReconnect,
|
|||
|
|
Status {
|
|||
|
|
reply: oneshot::Sender<BridgeStatus>,
|
|||
|
|
},
|
|||
|
|
Quit,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Clone, serde::Serialize)]
|
|||
|
|
pub struct BridgeStatus {
|
|||
|
|
pub irc_connected: bool,
|
|||
|
|
pub owncast_state: String,
|
|||
|
|
pub webhook_listening: bool,
|
|||
|
|
pub websocket_connected: bool,
|
|||
|
|
pub uptime_secs: u64,
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Add module to `src/main.rs`**
|
|||
|
|
|
|||
|
|
Add `mod events;`.
|
|||
|
|
|
|||
|
|
**Step 3: Verify it compiles**
|
|||
|
|
|
|||
|
|
Run: `cargo check`
|
|||
|
|
Expected: compiles
|
|||
|
|
|
|||
|
|
**Step 4: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add events module with BridgeEvent, ControlCommand, OwncastState types"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 4: HTML Stripping Utility
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/html.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod html;`)
|
|||
|
|
|
|||
|
|
**Step 1: Write tests**
|
|||
|
|
|
|||
|
|
At the bottom of `src/html.rs`:
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
#[cfg(test)]
|
|||
|
|
mod tests {
|
|||
|
|
use super::*;
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_plain_text_unchanged() {
|
|||
|
|
assert_eq!(strip_html("hello world"), "hello world");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_strips_basic_tags() {
|
|||
|
|
assert_eq!(strip_html("<b>bold</b> text"), "bold text");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_emoji_img_to_alt_text() {
|
|||
|
|
let input = r#"hello <img class="emoji" alt=":beerparrot:" title=":beerparrot:" src="/img/emoji/beerparrot.gif"> world"#;
|
|||
|
|
assert_eq!(strip_html(input), "hello :beerparrot: world");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_multiple_emoji() {
|
|||
|
|
let input = r#"<img class="emoji" alt=":a:" src="/a.gif"><img class="emoji" alt=":b:" src="/b.gif">"#;
|
|||
|
|
assert_eq!(strip_html(input), ":a::b:");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_strips_links() {
|
|||
|
|
let input = r#"check <a href="https://example.com">this link</a>"#;
|
|||
|
|
assert_eq!(strip_html(input), "check this link");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_decodes_html_entities() {
|
|||
|
|
assert_eq!(strip_html("a & b < c"), "a & b < c");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Run tests to verify they fail**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib html`
|
|||
|
|
Expected: FAIL
|
|||
|
|
|
|||
|
|
**Step 3: Implement `strip_html`**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use std::borrow::Cow;
|
|||
|
|
|
|||
|
|
/// Extracts alt text from <img> tags (for Owncast emoji) and strips all other HTML.
|
|||
|
|
pub fn strip_html(input: &str) -> String {
|
|||
|
|
let mut result = String::with_capacity(input.len());
|
|||
|
|
let mut chars = input.chars().peekable();
|
|||
|
|
|
|||
|
|
while let Some(&ch) = chars.peek() {
|
|||
|
|
if ch == '<' {
|
|||
|
|
let tag: String = chars.by_ref().take_while(|&c| c != '>').collect();
|
|||
|
|
if tag.starts_with("img ") || tag.starts_with("img\t") {
|
|||
|
|
if let Some(alt) = extract_attr(&tag, "alt") {
|
|||
|
|
result.push_str(&alt);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
} else if ch == '&' {
|
|||
|
|
let entity: String = std::iter::once(ch)
|
|||
|
|
.chain(chars.by_ref().take_while(|&c| c != ';'))
|
|||
|
|
.collect();
|
|||
|
|
result.push_str(&decode_entity(&entity));
|
|||
|
|
} else {
|
|||
|
|
result.push(ch);
|
|||
|
|
chars.next();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
result
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn extract_attr(tag: &str, attr_name: &str) -> Option<String> {
|
|||
|
|
let pattern = format!("{}=\"", attr_name);
|
|||
|
|
let start = tag.find(&pattern)? + pattern.len();
|
|||
|
|
let rest = &tag[start..];
|
|||
|
|
let end = rest.find('"')?;
|
|||
|
|
Some(rest[..end].to_string())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn decode_entity(entity: &str) -> Cow<'static, str> {
|
|||
|
|
match entity {
|
|||
|
|
"&" => Cow::Borrowed("&"),
|
|||
|
|
"<" => Cow::Borrowed("<"),
|
|||
|
|
">" => Cow::Borrowed(">"),
|
|||
|
|
""" => Cow::Borrowed("\""),
|
|||
|
|
"'" | "&apos" => Cow::Borrowed("'"),
|
|||
|
|
" " => Cow::Borrowed(" "),
|
|||
|
|
other => Cow::Owned(other.to_string()),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 4: Run tests**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib html`
|
|||
|
|
Expected: all PASS
|
|||
|
|
|
|||
|
|
**Step 5: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add HTML stripping utility for Owncast emoji and markup"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 5: Owncast API Sender
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/owncast_api.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod owncast_api;`)
|
|||
|
|
|
|||
|
|
**Step 1: Implement the Owncast API client**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use reqwest::Client;
|
|||
|
|
use tracing::{error, warn};
|
|||
|
|
|
|||
|
|
pub struct OwncastApiClient {
|
|||
|
|
client: Client,
|
|||
|
|
base_url: String,
|
|||
|
|
access_token: String,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl OwncastApiClient {
|
|||
|
|
pub fn new(base_url: String, access_token: String) -> Self {
|
|||
|
|
Self {
|
|||
|
|
client: Client::new(),
|
|||
|
|
base_url: base_url.trim_end_matches('/').to_string(),
|
|||
|
|
access_token,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn send_chat_message(&self, body: &str) -> anyhow::Result<()> {
|
|||
|
|
let url = format!("{}/api/integrations/chat/send", self.base_url);
|
|||
|
|
let resp = self
|
|||
|
|
.client
|
|||
|
|
.post(&url)
|
|||
|
|
.bearer_auth(&self.access_token)
|
|||
|
|
.json(&serde_json::json!({ "body": body }))
|
|||
|
|
.send()
|
|||
|
|
.await?;
|
|||
|
|
|
|||
|
|
let status = resp.status();
|
|||
|
|
if status.is_success() {
|
|||
|
|
return Ok(());
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
let resp_body = resp.text().await.unwrap_or_default();
|
|||
|
|
if status.is_client_error() {
|
|||
|
|
error!(status = %status, body = %resp_body, "Owncast API client error (not retrying)");
|
|||
|
|
anyhow::bail!("Owncast API returned {status}");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
warn!(status = %status, body = %resp_body, "Owncast API server error, retrying once");
|
|||
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|||
|
|
|
|||
|
|
let retry_resp = self
|
|||
|
|
.client
|
|||
|
|
.post(&url)
|
|||
|
|
.bearer_auth(&self.access_token)
|
|||
|
|
.json(&serde_json::json!({ "body": body }))
|
|||
|
|
.send()
|
|||
|
|
.await?;
|
|||
|
|
|
|||
|
|
if retry_resp.status().is_success() {
|
|||
|
|
Ok(())
|
|||
|
|
} else {
|
|||
|
|
let retry_body = retry_resp.text().await.unwrap_or_default();
|
|||
|
|
error!(status = %retry_resp.status(), body = %retry_body, "Owncast API retry failed");
|
|||
|
|
anyhow::bail!("Owncast API retry returned {}", retry_resp.status())
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn get_status(&self) -> anyhow::Result<OwncastStatus> {
|
|||
|
|
let url = format!("{}/api/status", self.base_url);
|
|||
|
|
let resp = self.client.get(&url).send().await?;
|
|||
|
|
let status: OwncastStatus = resp.json().await?;
|
|||
|
|
Ok(status)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, serde::Deserialize)]
|
|||
|
|
pub struct OwncastStatus {
|
|||
|
|
pub online: bool,
|
|||
|
|
#[serde(default)]
|
|||
|
|
#[serde(rename = "streamTitle")]
|
|||
|
|
pub stream_title: Option<String>,
|
|||
|
|
#[serde(default)]
|
|||
|
|
#[serde(rename = "viewerCount")]
|
|||
|
|
pub viewer_count: Option<u64>,
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Add module and verify compilation**
|
|||
|
|
|
|||
|
|
Run: `cargo check`
|
|||
|
|
Expected: compiles
|
|||
|
|
|
|||
|
|
**Step 3: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add Owncast API client for sending messages and checking status"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 6: Health Poller
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/health.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod health;`)
|
|||
|
|
|
|||
|
|
**Step 1: Implement health poller**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use std::time::Duration;
|
|||
|
|
|
|||
|
|
use tokio::sync::mpsc;
|
|||
|
|
use tracing::{info, warn};
|
|||
|
|
|
|||
|
|
use crate::events::OwncastState;
|
|||
|
|
use crate::owncast_api::OwncastApiClient;
|
|||
|
|
|
|||
|
|
pub async fn run_health_poller(
|
|||
|
|
api_client: &OwncastApiClient,
|
|||
|
|
state_tx: mpsc::Sender<OwncastState>,
|
|||
|
|
interval: Duration,
|
|||
|
|
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
|||
|
|
) {
|
|||
|
|
let mut current_state = OwncastState::Unavailable;
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
tokio::select! {
|
|||
|
|
_ = tokio::time::sleep(interval) => {},
|
|||
|
|
_ = shutdown.changed() => {
|
|||
|
|
info!("Health poller shutting down");
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
let new_state = match api_client.get_status().await {
|
|||
|
|
Ok(status) => {
|
|||
|
|
if status.online {
|
|||
|
|
OwncastState::Online
|
|||
|
|
} else {
|
|||
|
|
OwncastState::OfflineChatOpen
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
warn!(error = %e, "Failed to poll Owncast status");
|
|||
|
|
OwncastState::Unavailable
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
if new_state != current_state {
|
|||
|
|
info!(old = ?current_state, new = ?new_state, "Owncast state changed");
|
|||
|
|
current_state = new_state.clone();
|
|||
|
|
if state_tx.send(new_state).await.is_err() {
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Add module and verify compilation**
|
|||
|
|
|
|||
|
|
Run: `cargo check`
|
|||
|
|
Expected: compiles
|
|||
|
|
|
|||
|
|
**Step 3: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add Owncast health poller with state change detection"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 7: Webhook HTTP Server
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/webhook.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod webhook;`)
|
|||
|
|
|
|||
|
|
**Step 1: Write tests for webhook payload parsing**
|
|||
|
|
|
|||
|
|
At the bottom of `src/webhook.rs`:
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
#[cfg(test)]
|
|||
|
|
mod tests {
|
|||
|
|
use super::*;
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_chat_event() {
|
|||
|
|
let json = serde_json::json!({
|
|||
|
|
"type": "CHAT",
|
|||
|
|
"eventData": {
|
|||
|
|
"user": { "displayName": "viewer42", "isBot": false },
|
|||
|
|
"body": "hello <b>world</b>",
|
|||
|
|
"id": "abc123",
|
|||
|
|
"visible": true
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
let payload: WebhookPayload = serde_json::from_value(json).unwrap();
|
|||
|
|
let event = payload.into_bridge_event();
|
|||
|
|
assert!(event.is_some());
|
|||
|
|
if let Some(BridgeEvent::ChatMessage { source, username, body, id }) = event {
|
|||
|
|
assert_eq!(source, Source::Owncast);
|
|||
|
|
assert_eq!(username, "viewer42");
|
|||
|
|
assert_eq!(body, "hello world");
|
|||
|
|
assert_eq!(id, Some("abc123".to_string()));
|
|||
|
|
} else {
|
|||
|
|
panic!("Expected ChatMessage");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_stream_started() {
|
|||
|
|
let json = serde_json::json!({
|
|||
|
|
"type": "STREAM_STARTED",
|
|||
|
|
"eventData": {
|
|||
|
|
"id": "x",
|
|||
|
|
"name": "Test",
|
|||
|
|
"streamTitle": "Friday bowls",
|
|||
|
|
"summary": "",
|
|||
|
|
"timestamp": "2026-01-01T00:00:00Z"
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
let payload: WebhookPayload = serde_json::from_value(json).unwrap();
|
|||
|
|
let event = payload.into_bridge_event();
|
|||
|
|
assert!(matches!(event, Some(BridgeEvent::StreamStarted { title }) if title == "Friday bowls"));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_stream_stopped() {
|
|||
|
|
let json = serde_json::json!({
|
|||
|
|
"type": "STREAM_STOPPED",
|
|||
|
|
"eventData": {
|
|||
|
|
"id": "x",
|
|||
|
|
"name": "Test",
|
|||
|
|
"streamTitle": "",
|
|||
|
|
"summary": "",
|
|||
|
|
"timestamp": "2026-01-01T00:00:00Z"
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
let payload: WebhookPayload = serde_json::from_value(json).unwrap();
|
|||
|
|
let event = payload.into_bridge_event();
|
|||
|
|
assert!(matches!(event, Some(BridgeEvent::StreamStopped)));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_ignores_bot_messages() {
|
|||
|
|
let json = serde_json::json!({
|
|||
|
|
"type": "CHAT",
|
|||
|
|
"eventData": {
|
|||
|
|
"user": { "displayName": "bot", "isBot": true },
|
|||
|
|
"body": "automated message",
|
|||
|
|
"id": "x",
|
|||
|
|
"visible": true
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
let payload: WebhookPayload = serde_json::from_value(json).unwrap();
|
|||
|
|
let event = payload.into_bridge_event();
|
|||
|
|
assert!(event.is_none());
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_ignores_unknown_event_type() {
|
|||
|
|
let json = serde_json::json!({
|
|||
|
|
"type": "USER_JOINED",
|
|||
|
|
"eventData": { "id": "x" }
|
|||
|
|
});
|
|||
|
|
let payload: WebhookPayload = serde_json::from_value(json).unwrap();
|
|||
|
|
let event = payload.into_bridge_event();
|
|||
|
|
assert!(event.is_none());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Run tests to verify they fail**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib webhook`
|
|||
|
|
Expected: FAIL
|
|||
|
|
|
|||
|
|
**Step 3: Implement webhook server**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use axum::{extract::State, http::StatusCode, routing::post, Json, Router};
|
|||
|
|
use serde::Deserialize;
|
|||
|
|
use tokio::sync::mpsc;
|
|||
|
|
use tracing::{info, warn};
|
|||
|
|
|
|||
|
|
use crate::events::{BridgeEvent, Source};
|
|||
|
|
use crate::html::strip_html;
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
pub struct WebhookPayload {
|
|||
|
|
#[serde(rename = "type")]
|
|||
|
|
pub event_type: String,
|
|||
|
|
#[serde(rename = "eventData")]
|
|||
|
|
pub event_data: serde_json::Value,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
struct ChatEventData {
|
|||
|
|
user: ChatUser,
|
|||
|
|
body: String,
|
|||
|
|
id: String,
|
|||
|
|
#[serde(default = "default_visible")]
|
|||
|
|
visible: bool,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn default_visible() -> bool { true }
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
struct ChatUser {
|
|||
|
|
#[serde(rename = "displayName")]
|
|||
|
|
display_name: String,
|
|||
|
|
#[serde(default, rename = "isBot")]
|
|||
|
|
is_bot: bool,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Debug, Deserialize)]
|
|||
|
|
struct StreamEventData {
|
|||
|
|
#[serde(default, rename = "streamTitle")]
|
|||
|
|
stream_title: Option<String>,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl WebhookPayload {
|
|||
|
|
pub fn into_bridge_event(self) -> Option<BridgeEvent> {
|
|||
|
|
match self.event_type.as_str() {
|
|||
|
|
"CHAT" => {
|
|||
|
|
let data: ChatEventData = serde_json::from_value(self.event_data).ok()?;
|
|||
|
|
if data.user.is_bot || !data.visible {
|
|||
|
|
return None;
|
|||
|
|
}
|
|||
|
|
Some(BridgeEvent::ChatMessage {
|
|||
|
|
source: Source::Owncast,
|
|||
|
|
username: data.user.display_name,
|
|||
|
|
body: strip_html(&data.body),
|
|||
|
|
id: Some(data.id),
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
"STREAM_STARTED" => {
|
|||
|
|
let data: StreamEventData = serde_json::from_value(self.event_data).ok()?;
|
|||
|
|
Some(BridgeEvent::StreamStarted {
|
|||
|
|
title: data.stream_title.unwrap_or_default(),
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
"STREAM_STOPPED" => Some(BridgeEvent::StreamStopped),
|
|||
|
|
_ => None,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Clone)]
|
|||
|
|
struct WebhookState {
|
|||
|
|
event_tx: mpsc::Sender<BridgeEvent>,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async fn handle_webhook(
|
|||
|
|
State(state): State<WebhookState>,
|
|||
|
|
Json(payload): Json<WebhookPayload>,
|
|||
|
|
) -> StatusCode {
|
|||
|
|
info!(event_type = %payload.event_type, "Received webhook");
|
|||
|
|
|
|||
|
|
match payload.into_bridge_event() {
|
|||
|
|
Some(event) => {
|
|||
|
|
if state.event_tx.send(event).await.is_err() {
|
|||
|
|
warn!("Router channel closed");
|
|||
|
|
return StatusCode::INTERNAL_SERVER_ERROR;
|
|||
|
|
}
|
|||
|
|
StatusCode::OK
|
|||
|
|
}
|
|||
|
|
None => StatusCode::OK,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn run_webhook_server(
|
|||
|
|
port: u16,
|
|||
|
|
event_tx: mpsc::Sender<BridgeEvent>,
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
let state = WebhookState { event_tx };
|
|||
|
|
let app = Router::new()
|
|||
|
|
.route("/webhook", post(handle_webhook))
|
|||
|
|
.with_state(state);
|
|||
|
|
|
|||
|
|
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
|
|||
|
|
info!(%addr, "Starting webhook server");
|
|||
|
|
|
|||
|
|
let listener = tokio::net::TcpListener::bind(addr).await?;
|
|||
|
|
axum::serve(listener, app).await?;
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 4: Run tests**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib webhook`
|
|||
|
|
Expected: all PASS
|
|||
|
|
|
|||
|
|
**Step 5: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add webhook server with payload parsing and chat/stream event handling"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 8: IRC Task
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/irc_task.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod irc_task;`)
|
|||
|
|
|
|||
|
|
**Step 1: Implement IRC task**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use std::time::Duration;
|
|||
|
|
|
|||
|
|
use futures_util::StreamExt;
|
|||
|
|
use irc::client::prelude::*;
|
|||
|
|
use tokio::sync::mpsc;
|
|||
|
|
use tracing::{error, info, warn};
|
|||
|
|
|
|||
|
|
use crate::config::IrcConfig;
|
|||
|
|
use crate::events::{BridgeEvent, Source};
|
|||
|
|
|
|||
|
|
pub async fn run_irc_task(
|
|||
|
|
config: IrcConfig,
|
|||
|
|
event_tx: mpsc::Sender<BridgeEvent>,
|
|||
|
|
mut outbound_rx: mpsc::Receiver<String>,
|
|||
|
|
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
|||
|
|
) {
|
|||
|
|
let mut backoff = Duration::from_secs(1);
|
|||
|
|
let max_backoff = Duration::from_secs(60);
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
info!(server = %config.server, channel = %config.channel, "Connecting to IRC");
|
|||
|
|
|
|||
|
|
match connect_and_run(&config, &event_tx, &mut outbound_rx, &mut shutdown).await {
|
|||
|
|
Ok(()) => {
|
|||
|
|
info!("IRC task exiting cleanly");
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
error!(error = %e, "IRC connection error");
|
|||
|
|
info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff");
|
|||
|
|
|
|||
|
|
tokio::select! {
|
|||
|
|
_ = tokio::time::sleep(backoff) => {},
|
|||
|
|
_ = shutdown.changed() => return,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
backoff = (backoff * 2).min(max_backoff);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async fn connect_and_run(
|
|||
|
|
config: &IrcConfig,
|
|||
|
|
event_tx: &mpsc::Sender<BridgeEvent>,
|
|||
|
|
outbound_rx: &mut mpsc::Receiver<String>,
|
|||
|
|
shutdown: &mut tokio::sync::watch::Receiver<bool>,
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
let irc_config = Config {
|
|||
|
|
nickname: Some(config.nick.clone()),
|
|||
|
|
server: Some(config.server.clone()),
|
|||
|
|
port: Some(config.port),
|
|||
|
|
use_tls: Some(config.tls),
|
|||
|
|
channels: vec![config.channel.clone()],
|
|||
|
|
..Config::default()
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let mut client = Client::from_config(irc_config).await?;
|
|||
|
|
client.identify()?;
|
|||
|
|
|
|||
|
|
let mut stream = client.stream()?;
|
|||
|
|
let sender = client.sender();
|
|||
|
|
|
|||
|
|
info!("IRC connected, waiting for messages");
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
tokio::select! {
|
|||
|
|
msg = stream.next() => {
|
|||
|
|
match msg {
|
|||
|
|
Some(Ok(message)) => {
|
|||
|
|
if let Command::PRIVMSG(ref target, ref text) = message.command {
|
|||
|
|
if target == &config.channel {
|
|||
|
|
let nick = message.source_nickname().unwrap_or("unknown").to_string();
|
|||
|
|
let event = BridgeEvent::ChatMessage {
|
|||
|
|
source: Source::Irc,
|
|||
|
|
username: nick,
|
|||
|
|
body: text.clone(),
|
|||
|
|
id: None,
|
|||
|
|
};
|
|||
|
|
if event_tx.send(event).await.is_err() {
|
|||
|
|
return Ok(());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Some(Err(e)) => return Err(e.into()),
|
|||
|
|
None => return Err(anyhow::anyhow!("IRC stream ended")),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Some(text) = outbound_rx.recv() => {
|
|||
|
|
sender.send_privmsg(&config.channel, &text)?;
|
|||
|
|
}
|
|||
|
|
_ = shutdown.changed() => {
|
|||
|
|
let _ = sender.send_quit("Bridge shutting down");
|
|||
|
|
return Ok(());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Add module and verify compilation**
|
|||
|
|
|
|||
|
|
Run: `cargo check`
|
|||
|
|
Expected: compiles
|
|||
|
|
|
|||
|
|
**Step 3: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add IRC task with auto-reconnect and backoff"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 9: WebSocket Task
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/websocket.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod websocket;`)
|
|||
|
|
|
|||
|
|
**Step 1: Implement WebSocket task**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use std::time::Duration;
|
|||
|
|
|
|||
|
|
use futures_util::{SinkExt, StreamExt};
|
|||
|
|
use tokio::sync::mpsc;
|
|||
|
|
use tokio_tungstenite::connect_async;
|
|||
|
|
use tracing::{error, info, warn};
|
|||
|
|
|
|||
|
|
use crate::events::{BridgeEvent, Source};
|
|||
|
|
use crate::html::strip_html;
|
|||
|
|
|
|||
|
|
pub async fn run_websocket_task(
|
|||
|
|
owncast_url: String,
|
|||
|
|
event_tx: mpsc::Sender<BridgeEvent>,
|
|||
|
|
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
|||
|
|
) {
|
|||
|
|
let mut backoff = Duration::from_secs(1);
|
|||
|
|
let max_backoff = Duration::from_secs(60);
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
let ws_url = build_ws_url(&owncast_url);
|
|||
|
|
info!(url = %ws_url, "Connecting to Owncast WebSocket");
|
|||
|
|
|
|||
|
|
match connect_and_listen(&ws_url, &event_tx, &mut shutdown).await {
|
|||
|
|
Ok(()) => {
|
|||
|
|
info!("WebSocket task exiting cleanly");
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
warn!(error = %e, "WebSocket connection error");
|
|||
|
|
info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff");
|
|||
|
|
|
|||
|
|
tokio::select! {
|
|||
|
|
_ = tokio::time::sleep(backoff) => {},
|
|||
|
|
_ = shutdown.changed() => return,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
backoff = (backoff * 2).min(max_backoff);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn build_ws_url(base_url: &str) -> String {
|
|||
|
|
let base = base_url.trim_end_matches('/');
|
|||
|
|
let ws_base = if base.starts_with("https://") {
|
|||
|
|
base.replacen("https://", "wss://", 1)
|
|||
|
|
} else {
|
|||
|
|
base.replacen("http://", "ws://", 1)
|
|||
|
|
};
|
|||
|
|
format!("{}/ws", ws_base)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async fn connect_and_listen(
|
|||
|
|
ws_url: &str,
|
|||
|
|
event_tx: &mpsc::Sender<BridgeEvent>,
|
|||
|
|
shutdown: &mut tokio::sync::watch::Receiver<bool>,
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
let (ws_stream, _) = connect_async(ws_url).await?;
|
|||
|
|
let (mut _write, mut read) = ws_stream.split();
|
|||
|
|
|
|||
|
|
info!("WebSocket connected");
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
tokio::select! {
|
|||
|
|
msg = read.next() => {
|
|||
|
|
match msg {
|
|||
|
|
Some(Ok(ws_msg)) => {
|
|||
|
|
if let Ok(text) = ws_msg.into_text() {
|
|||
|
|
if let Some(event) = parse_ws_message(&text) {
|
|||
|
|
if event_tx.send(event).await.is_err() {
|
|||
|
|
return Ok(());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Some(Err(e)) => return Err(e.into()),
|
|||
|
|
None => return Err(anyhow::anyhow!("WebSocket stream ended")),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
_ = shutdown.changed() => return Ok(()),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn parse_ws_message(text: &str) -> Option<BridgeEvent> {
|
|||
|
|
let value: serde_json::Value = serde_json::from_str(text).ok()?;
|
|||
|
|
let msg_type = value.get("type")?.as_str()?;
|
|||
|
|
|
|||
|
|
match msg_type {
|
|||
|
|
"CHAT" => {
|
|||
|
|
let user = value.get("user")?;
|
|||
|
|
let is_bot = user.get("isBot").and_then(|v| v.as_bool()).unwrap_or(false);
|
|||
|
|
if is_bot {
|
|||
|
|
return None;
|
|||
|
|
}
|
|||
|
|
let display_name = user.get("displayName")?.as_str()?.to_string();
|
|||
|
|
let body = value.get("body")?.as_str()?;
|
|||
|
|
let id = value.get("id").and_then(|v| v.as_str()).map(String::from);
|
|||
|
|
|
|||
|
|
Some(BridgeEvent::ChatMessage {
|
|||
|
|
source: Source::Owncast,
|
|||
|
|
username: display_name,
|
|||
|
|
body: strip_html(body),
|
|||
|
|
id,
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
_ => None,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[cfg(test)]
|
|||
|
|
mod tests {
|
|||
|
|
use super::*;
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_build_ws_url_https() {
|
|||
|
|
assert_eq!(
|
|||
|
|
build_ws_url("https://owncast.example.com"),
|
|||
|
|
"wss://owncast.example.com/ws"
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_build_ws_url_http() {
|
|||
|
|
assert_eq!(
|
|||
|
|
build_ws_url("http://localhost:8080"),
|
|||
|
|
"ws://localhost:8080/ws"
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_build_ws_url_trailing_slash() {
|
|||
|
|
assert_eq!(
|
|||
|
|
build_ws_url("https://owncast.example.com/"),
|
|||
|
|
"wss://owncast.example.com/ws"
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_ws_chat_message() {
|
|||
|
|
let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"viewer","isBot":false}}"#;
|
|||
|
|
let event = parse_ws_message(json);
|
|||
|
|
assert!(matches!(
|
|||
|
|
event,
|
|||
|
|
Some(BridgeEvent::ChatMessage { ref username, ref body, .. })
|
|||
|
|
if username == "viewer" && body == "hello"
|
|||
|
|
));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_ws_bot_message_ignored() {
|
|||
|
|
let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"bot","isBot":true}}"#;
|
|||
|
|
assert!(parse_ws_message(json).is_none());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Add module and verify compilation**
|
|||
|
|
|
|||
|
|
Run: `cargo check`
|
|||
|
|
Expected: compiles
|
|||
|
|
|
|||
|
|
**Step 3: Run tests**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib websocket`
|
|||
|
|
Expected: all PASS
|
|||
|
|
|
|||
|
|
**Step 4: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add WebSocket task for Owncast chat with reconnect"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 10: Control Task & Unix Socket
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/control.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod control;`)
|
|||
|
|
|
|||
|
|
**Step 1: Write tests for command parsing**
|
|||
|
|
|
|||
|
|
At the bottom of `src/control.rs`:
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
#[cfg(test)]
|
|||
|
|
mod tests {
|
|||
|
|
use super::*;
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_irc_commands() {
|
|||
|
|
assert!(matches!(parse_command("irc connect"), Some(ParsedCommand::IrcConnect)));
|
|||
|
|
assert!(matches!(parse_command("irc disconnect"), Some(ParsedCommand::IrcDisconnect)));
|
|||
|
|
assert!(matches!(parse_command("irc reconnect"), Some(ParsedCommand::IrcReconnect)));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_owncast_commands() {
|
|||
|
|
assert!(matches!(parse_command("owncast connect"), Some(ParsedCommand::OwncastConnect)));
|
|||
|
|
assert!(matches!(parse_command("owncast disconnect"), Some(ParsedCommand::OwncastDisconnect)));
|
|||
|
|
assert!(matches!(parse_command("owncast reconnect"), Some(ParsedCommand::OwncastReconnect)));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_status_and_quit() {
|
|||
|
|
assert!(matches!(parse_command("status"), Some(ParsedCommand::Status)));
|
|||
|
|
assert!(matches!(parse_command("quit"), Some(ParsedCommand::Quit)));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_unknown() {
|
|||
|
|
assert!(parse_command("unknown command").is_none());
|
|||
|
|
assert!(parse_command("").is_none());
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_parse_case_insensitive_trimmed() {
|
|||
|
|
assert!(matches!(parse_command(" IRC CONNECT "), Some(ParsedCommand::IrcConnect)));
|
|||
|
|
assert!(matches!(parse_command("QUIT\n"), Some(ParsedCommand::Quit)));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Run tests to verify they fail**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib control`
|
|||
|
|
Expected: FAIL
|
|||
|
|
|
|||
|
|
**Step 3: Implement control socket listener**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
|||
|
|
use tokio::net::UnixListener;
|
|||
|
|
use tokio::sync::{mpsc, oneshot};
|
|||
|
|
use tracing::{error, info, warn};
|
|||
|
|
|
|||
|
|
use crate::events::{BridgeStatus, ControlCommand};
|
|||
|
|
|
|||
|
|
#[derive(Debug, PartialEq)]
|
|||
|
|
enum ParsedCommand {
|
|||
|
|
IrcConnect,
|
|||
|
|
IrcDisconnect,
|
|||
|
|
IrcReconnect,
|
|||
|
|
OwncastConnect,
|
|||
|
|
OwncastDisconnect,
|
|||
|
|
OwncastReconnect,
|
|||
|
|
Status,
|
|||
|
|
Quit,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn parse_command(input: &str) -> Option<ParsedCommand> {
|
|||
|
|
let trimmed = input.trim().to_lowercase();
|
|||
|
|
match trimmed.as_str() {
|
|||
|
|
"irc connect" => Some(ParsedCommand::IrcConnect),
|
|||
|
|
"irc disconnect" => Some(ParsedCommand::IrcDisconnect),
|
|||
|
|
"irc reconnect" => Some(ParsedCommand::IrcReconnect),
|
|||
|
|
"owncast connect" => Some(ParsedCommand::OwncastConnect),
|
|||
|
|
"owncast disconnect" => Some(ParsedCommand::OwncastDisconnect),
|
|||
|
|
"owncast reconnect" => Some(ParsedCommand::OwncastReconnect),
|
|||
|
|
"status" => Some(ParsedCommand::Status),
|
|||
|
|
"quit" => Some(ParsedCommand::Quit),
|
|||
|
|
_ => None,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn run_control_socket(
|
|||
|
|
socket_path: &str,
|
|||
|
|
control_tx: mpsc::Sender<ControlCommand>,
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
let _ = std::fs::remove_file(socket_path);
|
|||
|
|
let listener = UnixListener::bind(socket_path)?;
|
|||
|
|
info!(path = %socket_path, "Control socket listening");
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
let (stream, _) = listener.accept().await?;
|
|||
|
|
let control_tx = control_tx.clone();
|
|||
|
|
|
|||
|
|
tokio::spawn(async move {
|
|||
|
|
let (reader, mut writer) = stream.into_split();
|
|||
|
|
let mut reader = BufReader::new(reader);
|
|||
|
|
let mut line = String::new();
|
|||
|
|
|
|||
|
|
if reader.read_line(&mut line).await.is_err() {
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
match parse_command(&line) {
|
|||
|
|
Some(ParsedCommand::Status) => {
|
|||
|
|
let (reply_tx, reply_rx) = oneshot::channel();
|
|||
|
|
let cmd = ControlCommand::Status { reply: reply_tx };
|
|||
|
|
if control_tx.send(cmd).await.is_ok() {
|
|||
|
|
if let Ok(status) = reply_rx.await {
|
|||
|
|
let json = serde_json::to_string_pretty(&status).unwrap_or_default();
|
|||
|
|
let _ = writer.write_all(json.as_bytes()).await;
|
|||
|
|
let _ = writer.write_all(b"\n").await;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Some(parsed) => {
|
|||
|
|
let cmd = match parsed {
|
|||
|
|
ParsedCommand::IrcConnect => ControlCommand::IrcConnect,
|
|||
|
|
ParsedCommand::IrcDisconnect => ControlCommand::IrcDisconnect,
|
|||
|
|
ParsedCommand::IrcReconnect => ControlCommand::IrcReconnect,
|
|||
|
|
ParsedCommand::OwncastConnect => ControlCommand::OwncastConnect,
|
|||
|
|
ParsedCommand::OwncastDisconnect => ControlCommand::OwncastDisconnect,
|
|||
|
|
ParsedCommand::OwncastReconnect => ControlCommand::OwncastReconnect,
|
|||
|
|
ParsedCommand::Quit => ControlCommand::Quit,
|
|||
|
|
ParsedCommand::Status => unreachable!(),
|
|||
|
|
};
|
|||
|
|
if control_tx.send(cmd).await.is_ok() {
|
|||
|
|
let _ = writer.write_all(b"OK\n").await;
|
|||
|
|
} else {
|
|||
|
|
let _ = writer.write_all(b"ERROR: channel closed\n").await;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
None => {
|
|||
|
|
let _ = writer.write_all(b"ERROR: unknown command\n").await;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 4: Run tests**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib control`
|
|||
|
|
Expected: all PASS
|
|||
|
|
|
|||
|
|
**Step 5: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add Unix socket control interface with command parsing"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 11: Router (Central Orchestration)
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `src/router.rs`
|
|||
|
|
- Modify: `src/main.rs` (add `mod router;`)
|
|||
|
|
|
|||
|
|
**Step 1: Write tests for echo suppression and dedup**
|
|||
|
|
|
|||
|
|
At the bottom of `src/router.rs`:
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
#[cfg(test)]
|
|||
|
|
mod tests {
|
|||
|
|
use super::*;
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_dedup_tracker_new_id() {
|
|||
|
|
let mut tracker = DedupTracker::new(100);
|
|||
|
|
assert!(!tracker.is_duplicate("msg-1"));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_dedup_tracker_duplicate() {
|
|||
|
|
let mut tracker = DedupTracker::new(100);
|
|||
|
|
assert!(!tracker.is_duplicate("msg-1"));
|
|||
|
|
assert!(tracker.is_duplicate("msg-1"));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_dedup_tracker_evicts_old() {
|
|||
|
|
let mut tracker = DedupTracker::new(2);
|
|||
|
|
tracker.is_duplicate("a");
|
|||
|
|
tracker.is_duplicate("b");
|
|||
|
|
tracker.is_duplicate("c");
|
|||
|
|
assert!(!tracker.is_duplicate("a"));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[test]
|
|||
|
|
fn test_echo_suppressor() {
|
|||
|
|
let mut suppressor = EchoSuppressor::new(10);
|
|||
|
|
suppressor.record_sent("hello from IRC");
|
|||
|
|
assert!(suppressor.is_echo("hello from IRC"));
|
|||
|
|
assert!(!suppressor.is_echo("different message"));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Run tests to verify they fail**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib router`
|
|||
|
|
Expected: FAIL
|
|||
|
|
|
|||
|
|
**Step 3: Implement router**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use std::collections::{HashSet, VecDeque};
|
|||
|
|
use std::time::Instant;
|
|||
|
|
|
|||
|
|
use tokio::sync::mpsc;
|
|||
|
|
use tracing::{info, warn};
|
|||
|
|
|
|||
|
|
use crate::config::BridgeSettings;
|
|||
|
|
use crate::events::{BridgeEvent, ControlCommand, OwncastState, Source};
|
|||
|
|
use crate::owncast_api::OwncastApiClient;
|
|||
|
|
|
|||
|
|
pub struct DedupTracker {
|
|||
|
|
seen: VecDeque<String>,
|
|||
|
|
set: HashSet<String>,
|
|||
|
|
capacity: usize,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl DedupTracker {
|
|||
|
|
pub fn new(capacity: usize) -> Self {
|
|||
|
|
Self {
|
|||
|
|
seen: VecDeque::with_capacity(capacity),
|
|||
|
|
set: HashSet::with_capacity(capacity),
|
|||
|
|
capacity,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn is_duplicate(&mut self, id: &str) -> bool {
|
|||
|
|
if self.set.contains(id) {
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
if self.seen.len() >= self.capacity {
|
|||
|
|
if let Some(old) = self.seen.pop_front() {
|
|||
|
|
self.set.remove(&old);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
self.set.insert(id.to_string());
|
|||
|
|
self.seen.push_back(id.to_string());
|
|||
|
|
false
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub struct EchoSuppressor {
|
|||
|
|
recent: VecDeque<String>,
|
|||
|
|
capacity: usize,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl EchoSuppressor {
|
|||
|
|
pub fn new(capacity: usize) -> Self {
|
|||
|
|
Self {
|
|||
|
|
recent: VecDeque::with_capacity(capacity),
|
|||
|
|
capacity,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn record_sent(&mut self, body: &str) {
|
|||
|
|
if self.recent.len() >= self.capacity {
|
|||
|
|
self.recent.pop_front();
|
|||
|
|
}
|
|||
|
|
self.recent.push_back(body.to_string());
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn is_echo(&mut self, body: &str) -> bool {
|
|||
|
|
if let Some(pos) = self.recent.iter().position(|s| s == body) {
|
|||
|
|
self.recent.remove(pos);
|
|||
|
|
true
|
|||
|
|
} else {
|
|||
|
|
false
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn run_router(
|
|||
|
|
settings: BridgeSettings,
|
|||
|
|
owncast_url: String,
|
|||
|
|
api_client: OwncastApiClient,
|
|||
|
|
mut event_rx: mpsc::Receiver<BridgeEvent>,
|
|||
|
|
irc_outbound_tx: mpsc::Sender<String>,
|
|||
|
|
mut state_rx: mpsc::Receiver<OwncastState>,
|
|||
|
|
mut control_rx: mpsc::Receiver<ControlCommand>,
|
|||
|
|
shutdown_tx: tokio::sync::watch::Sender<bool>,
|
|||
|
|
start_time: Instant,
|
|||
|
|
) {
|
|||
|
|
let mut dedup = DedupTracker::new(500);
|
|||
|
|
let mut echo_suppressor = EchoSuppressor::new(50);
|
|||
|
|
let mut owncast_state = OwncastState::Unavailable;
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
tokio::select! {
|
|||
|
|
Some(event) = event_rx.recv() => {
|
|||
|
|
handle_event(
|
|||
|
|
event,
|
|||
|
|
&settings,
|
|||
|
|
&owncast_url,
|
|||
|
|
&api_client,
|
|||
|
|
&irc_outbound_tx,
|
|||
|
|
&mut dedup,
|
|||
|
|
&mut echo_suppressor,
|
|||
|
|
&owncast_state,
|
|||
|
|
).await;
|
|||
|
|
}
|
|||
|
|
Some(new_state) = state_rx.recv() => {
|
|||
|
|
if new_state != owncast_state {
|
|||
|
|
handle_state_change(&new_state, &owncast_state, &irc_outbound_tx).await;
|
|||
|
|
owncast_state = new_state;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Some(cmd) = control_rx.recv() => {
|
|||
|
|
match cmd {
|
|||
|
|
ControlCommand::Quit => {
|
|||
|
|
info!("Quit command received, shutting down");
|
|||
|
|
let _ = shutdown_tx.send(true);
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
ControlCommand::Status { reply } => {
|
|||
|
|
let status = crate::events::BridgeStatus {
|
|||
|
|
irc_connected: true,
|
|||
|
|
owncast_state: format!("{:?}", owncast_state),
|
|||
|
|
webhook_listening: true,
|
|||
|
|
websocket_connected: false,
|
|||
|
|
uptime_secs: start_time.elapsed().as_secs(),
|
|||
|
|
};
|
|||
|
|
let _ = reply.send(status);
|
|||
|
|
}
|
|||
|
|
other => {
|
|||
|
|
warn!(command = ?other, "Control command not yet implemented in router");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
else => break,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async fn handle_event(
|
|||
|
|
event: BridgeEvent,
|
|||
|
|
settings: &BridgeSettings,
|
|||
|
|
owncast_url: &str,
|
|||
|
|
api_client: &OwncastApiClient,
|
|||
|
|
irc_tx: &mpsc::Sender<String>,
|
|||
|
|
dedup: &mut DedupTracker,
|
|||
|
|
echo: &mut EchoSuppressor,
|
|||
|
|
owncast_state: &OwncastState,
|
|||
|
|
) {
|
|||
|
|
match event {
|
|||
|
|
BridgeEvent::ChatMessage { source, username, body, id } => {
|
|||
|
|
if let Some(ref msg_id) = id {
|
|||
|
|
if dedup.is_duplicate(msg_id) {
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
match source {
|
|||
|
|
Source::Irc => {
|
|||
|
|
if *owncast_state == OwncastState::Unavailable {
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
let formatted = format!("{} <{}> {}", settings.irc_prefix, username, body);
|
|||
|
|
echo.record_sent(&formatted);
|
|||
|
|
if let Err(e) = api_client.send_chat_message(&formatted).await {
|
|||
|
|
warn!(error = %e, "Failed to send to Owncast");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Source::Owncast => {
|
|||
|
|
let formatted = format!("{} <{}> {}", settings.owncast_prefix, username, body);
|
|||
|
|
if echo.is_echo(&body) {
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
if irc_tx.send(formatted).await.is_err() {
|
|||
|
|
warn!("IRC outbound channel closed");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
BridgeEvent::StreamStarted { title } => {
|
|||
|
|
let msg = if title.is_empty() {
|
|||
|
|
format!("Stream started — {}", owncast_url)
|
|||
|
|
} else {
|
|||
|
|
format!("Stream started: {} — {}", title, owncast_url)
|
|||
|
|
};
|
|||
|
|
let _ = irc_tx.send(msg).await;
|
|||
|
|
}
|
|||
|
|
BridgeEvent::StreamStopped => {
|
|||
|
|
let _ = irc_tx.send("Stream ended.".to_string()).await;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async fn handle_state_change(
|
|||
|
|
new: &OwncastState,
|
|||
|
|
_old: &OwncastState,
|
|||
|
|
irc_tx: &mpsc::Sender<String>,
|
|||
|
|
) {
|
|||
|
|
match new {
|
|||
|
|
OwncastState::Online => {
|
|||
|
|
let _ = irc_tx.send("Owncast chat is now available.".to_string()).await;
|
|||
|
|
}
|
|||
|
|
OwncastState::Unavailable => {
|
|||
|
|
let _ = irc_tx.send("Owncast chat is currently unavailable.".to_string()).await;
|
|||
|
|
}
|
|||
|
|
OwncastState::OfflineChatOpen => {}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 4: Run tests**
|
|||
|
|
|
|||
|
|
Run: `cargo test --lib router`
|
|||
|
|
Expected: all PASS
|
|||
|
|
|
|||
|
|
**Step 5: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add router with dedup, echo suppression, and state handling"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 12: Main Entry Point (Wire Everything Together)
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Modify: `src/main.rs`
|
|||
|
|
|
|||
|
|
**Step 1: Implement main**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
mod config;
|
|||
|
|
mod control;
|
|||
|
|
mod events;
|
|||
|
|
mod health;
|
|||
|
|
mod html;
|
|||
|
|
mod irc_task;
|
|||
|
|
mod owncast_api;
|
|||
|
|
mod router;
|
|||
|
|
mod webhook;
|
|||
|
|
mod websocket;
|
|||
|
|
|
|||
|
|
use std::path::PathBuf;
|
|||
|
|
use std::time::Instant;
|
|||
|
|
|
|||
|
|
use clap::Parser;
|
|||
|
|
use tokio::sync::{mpsc, watch};
|
|||
|
|
use tracing::info;
|
|||
|
|
|
|||
|
|
#[derive(Parser)]
|
|||
|
|
#[command(name = "owncast-irc-bridge")]
|
|||
|
|
#[command(about = "Bidirectional chat bridge between Owncast and IRC")]
|
|||
|
|
struct Cli {
|
|||
|
|
/// Path to config file
|
|||
|
|
#[arg(short, long, default_value = "config.toml")]
|
|||
|
|
config: PathBuf,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[tokio::main]
|
|||
|
|
async fn main() -> anyhow::Result<()> {
|
|||
|
|
tracing_subscriber::fmt()
|
|||
|
|
.with_env_filter(
|
|||
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|||
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
|||
|
|
)
|
|||
|
|
.init();
|
|||
|
|
|
|||
|
|
let cli = Cli::parse();
|
|||
|
|
let config = config::BridgeConfig::load(&cli.config)?;
|
|||
|
|
let access_token = config.owncast_access_token()?;
|
|||
|
|
|
|||
|
|
info!("Starting owncast-irc-bridge");
|
|||
|
|
|
|||
|
|
let start_time = Instant::now();
|
|||
|
|
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
|||
|
|
|
|||
|
|
let (event_tx, event_rx) = mpsc::channel(256);
|
|||
|
|
let (irc_outbound_tx, irc_outbound_rx) = mpsc::channel(256);
|
|||
|
|
let (state_tx, state_rx) = mpsc::channel(32);
|
|||
|
|
let (control_tx, control_rx) = mpsc::channel(32);
|
|||
|
|
|
|||
|
|
let api_client = owncast_api::OwncastApiClient::new(
|
|||
|
|
config.owncast.url.clone(),
|
|||
|
|
access_token,
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
// IRC task
|
|||
|
|
let irc_config = config.irc.clone();
|
|||
|
|
let irc_event_tx = event_tx.clone();
|
|||
|
|
let irc_shutdown = shutdown_rx.clone();
|
|||
|
|
let irc_handle = tokio::spawn(async move {
|
|||
|
|
irc_task::run_irc_task(irc_config, irc_event_tx, irc_outbound_rx, irc_shutdown).await;
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// Webhook server
|
|||
|
|
let webhook_port = config.owncast.webhook_port;
|
|||
|
|
let webhook_event_tx = event_tx.clone();
|
|||
|
|
let webhook_handle = tokio::spawn(async move {
|
|||
|
|
if let Err(e) = webhook::run_webhook_server(webhook_port, webhook_event_tx).await {
|
|||
|
|
tracing::error!(error = %e, "Webhook server failed");
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// WebSocket task (optional)
|
|||
|
|
let ws_handle = if config.owncast.websocket_enabled {
|
|||
|
|
let ws_url = config.owncast.url.clone();
|
|||
|
|
let ws_event_tx = event_tx.clone();
|
|||
|
|
let ws_shutdown = shutdown_rx.clone();
|
|||
|
|
Some(tokio::spawn(async move {
|
|||
|
|
websocket::run_websocket_task(ws_url, ws_event_tx, ws_shutdown).await;
|
|||
|
|
}))
|
|||
|
|
} else {
|
|||
|
|
None
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// Health poller
|
|||
|
|
let health_api = owncast_api::OwncastApiClient::new(
|
|||
|
|
config.owncast.url.clone(),
|
|||
|
|
String::new(),
|
|||
|
|
);
|
|||
|
|
let health_interval = std::time::Duration::from_secs(config.owncast.health_poll_interval_secs);
|
|||
|
|
let health_shutdown = shutdown_rx.clone();
|
|||
|
|
let health_handle = tokio::spawn(async move {
|
|||
|
|
health::run_health_poller(&health_api, state_tx, health_interval, health_shutdown).await;
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// Control socket
|
|||
|
|
let control_socket_path = config.control.socket_path.clone();
|
|||
|
|
let control_handle = tokio::spawn(async move {
|
|||
|
|
if let Err(e) = control::run_control_socket(&control_socket_path, control_tx).await {
|
|||
|
|
tracing::error!(error = %e, "Control socket failed");
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// Signal handling
|
|||
|
|
let sig_shutdown_tx = shutdown_tx.clone();
|
|||
|
|
tokio::spawn(async move {
|
|||
|
|
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
|||
|
|
.expect("Failed to register SIGTERM handler");
|
|||
|
|
let mut sighup = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
|
|||
|
|
.expect("Failed to register SIGHUP handler");
|
|||
|
|
|
|||
|
|
tokio::select! {
|
|||
|
|
_ = tokio::signal::ctrl_c() => {
|
|||
|
|
info!("SIGINT received, shutting down");
|
|||
|
|
let _ = sig_shutdown_tx.send(true);
|
|||
|
|
}
|
|||
|
|
_ = sigterm.recv() => {
|
|||
|
|
info!("SIGTERM received, shutting down");
|
|||
|
|
let _ = sig_shutdown_tx.send(true);
|
|||
|
|
}
|
|||
|
|
_ = sighup.recv() => {
|
|||
|
|
info!("SIGHUP received (reconnect not yet wired)");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// Router (runs on main task)
|
|||
|
|
router::run_router(
|
|||
|
|
config.bridge,
|
|||
|
|
config.owncast.url,
|
|||
|
|
api_client,
|
|||
|
|
event_rx,
|
|||
|
|
irc_outbound_tx,
|
|||
|
|
state_rx,
|
|||
|
|
control_rx,
|
|||
|
|
shutdown_tx,
|
|||
|
|
start_time,
|
|||
|
|
)
|
|||
|
|
.await;
|
|||
|
|
|
|||
|
|
info!("Bridge shutting down");
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Verify it compiles**
|
|||
|
|
|
|||
|
|
Run: `cargo check`
|
|||
|
|
Expected: compiles. Fix any issues with `Clone` derives or visibility.
|
|||
|
|
|
|||
|
|
**Step 3: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: wire all tasks together in main with signal handling"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 13: bridge-ctl CLI
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Modify: `src/bin/bridge_ctl.rs`
|
|||
|
|
|
|||
|
|
**Step 1: Implement bridge-ctl**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
use std::io::{BufRead, BufReader, Write};
|
|||
|
|
use std::os::unix::net::UnixStream;
|
|||
|
|
use std::path::PathBuf;
|
|||
|
|
use std::time::Duration;
|
|||
|
|
|
|||
|
|
use clap::{Parser, Subcommand};
|
|||
|
|
|
|||
|
|
#[derive(Parser)]
|
|||
|
|
#[command(name = "bridge-ctl")]
|
|||
|
|
#[command(about = "Control a running owncast-irc-bridge instance")]
|
|||
|
|
struct Cli {
|
|||
|
|
/// Path to the bridge control socket
|
|||
|
|
#[arg(short, long, default_value = "/tmp/owncast-irc-bridge.sock")]
|
|||
|
|
socket: PathBuf,
|
|||
|
|
|
|||
|
|
#[command(subcommand)]
|
|||
|
|
command: Commands,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Subcommand)]
|
|||
|
|
enum Commands {
|
|||
|
|
/// Show bridge status
|
|||
|
|
Status,
|
|||
|
|
/// Control IRC connection
|
|||
|
|
Irc {
|
|||
|
|
#[command(subcommand)]
|
|||
|
|
action: ConnectionAction,
|
|||
|
|
},
|
|||
|
|
/// Control Owncast connection
|
|||
|
|
Owncast {
|
|||
|
|
#[command(subcommand)]
|
|||
|
|
action: ConnectionAction,
|
|||
|
|
},
|
|||
|
|
/// Shut down the bridge
|
|||
|
|
Quit,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[derive(Subcommand)]
|
|||
|
|
enum ConnectionAction {
|
|||
|
|
Connect,
|
|||
|
|
Disconnect,
|
|||
|
|
Reconnect,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn main() {
|
|||
|
|
let cli = Cli::parse();
|
|||
|
|
|
|||
|
|
let command_str = match &cli.command {
|
|||
|
|
Commands::Status => "status".to_string(),
|
|||
|
|
Commands::Quit => "quit".to_string(),
|
|||
|
|
Commands::Irc { action } => format!("irc {}", action_str(action)),
|
|||
|
|
Commands::Owncast { action } => format!("owncast {}", action_str(action)),
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
match send_command(&cli.socket, &command_str) {
|
|||
|
|
Ok(response) => print!("{response}"),
|
|||
|
|
Err(e) => {
|
|||
|
|
eprintln!("Error: {e}");
|
|||
|
|
std::process::exit(1);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn action_str(action: &ConnectionAction) -> &'static str {
|
|||
|
|
match action {
|
|||
|
|
ConnectionAction::Connect => "connect",
|
|||
|
|
ConnectionAction::Disconnect => "disconnect",
|
|||
|
|
ConnectionAction::Reconnect => "reconnect",
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn send_command(socket_path: &PathBuf, command: &str) -> Result<String, Box<dyn std::error::Error>> {
|
|||
|
|
let mut stream = UnixStream::connect(socket_path)?;
|
|||
|
|
stream.set_read_timeout(Some(Duration::from_secs(5)))?;
|
|||
|
|
|
|||
|
|
writeln!(stream, "{command}")?;
|
|||
|
|
stream.flush()?;
|
|||
|
|
|
|||
|
|
let reader = BufReader::new(stream);
|
|||
|
|
let mut response = String::new();
|
|||
|
|
for line in reader.lines() {
|
|||
|
|
match line {
|
|||
|
|
Ok(l) => {
|
|||
|
|
response.push_str(&l);
|
|||
|
|
response.push('\n');
|
|||
|
|
}
|
|||
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
|
|||
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => break,
|
|||
|
|
Err(e) => return Err(e.into()),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
Ok(response)
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Verify it compiles**
|
|||
|
|
|
|||
|
|
Run: `cargo check --bin bridge-ctl`
|
|||
|
|
Expected: compiles
|
|||
|
|
|
|||
|
|
**Step 3: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add bridge-ctl CLI for runtime control"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 14: Dockerfile
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
- Create: `Dockerfile`
|
|||
|
|
|
|||
|
|
**Step 1: Write Dockerfile**
|
|||
|
|
|
|||
|
|
```dockerfile
|
|||
|
|
FROM rust:1.85-slim-bookworm AS builder
|
|||
|
|
|
|||
|
|
RUN apt-get update && apt-get install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/*
|
|||
|
|
|
|||
|
|
WORKDIR /app
|
|||
|
|
COPY Cargo.toml Cargo.lock ./
|
|||
|
|
COPY src/ src/
|
|||
|
|
|
|||
|
|
RUN cargo build --release
|
|||
|
|
|
|||
|
|
FROM debian:bookworm-slim
|
|||
|
|
|
|||
|
|
RUN apt-get update && apt-get install -y ca-certificates libssl3 && rm -rf /var/lib/apt/lists/*
|
|||
|
|
|
|||
|
|
COPY --from=builder /app/target/release/owncast-irc-bridge /usr/local/bin/
|
|||
|
|
COPY --from=builder /app/target/release/bridge-ctl /usr/local/bin/
|
|||
|
|
|
|||
|
|
ENTRYPOINT ["owncast-irc-bridge"]
|
|||
|
|
CMD ["--config", "/etc/owncast-irc-bridge/config.toml"]
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Step 2: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "feat: add multi-stage Dockerfile"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
### Task 15: Add Clone derives and final compilation fix
|
|||
|
|
|
|||
|
|
At this point, do a full `cargo build` and fix any missing trait derives (e.g. `Clone` on `IrcConfig`), visibility issues, or type mismatches that appear. Ensure both binaries compile.
|
|||
|
|
|
|||
|
|
**Step 1: Full build**
|
|||
|
|
|
|||
|
|
Run: `cargo build`
|
|||
|
|
Expected: compiles with both binaries
|
|||
|
|
|
|||
|
|
**Step 2: Run all tests**
|
|||
|
|
|
|||
|
|
Run: `cargo test`
|
|||
|
|
Expected: all tests pass
|
|||
|
|
|
|||
|
|
**Step 3: Commit any fixes**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add -A
|
|||
|
|
git commit -m "fix: add missing derives and resolve compilation issues"
|
|||
|
|
```
|