Compare commits
10 Commits
cc806c55e7
...
b0236ee52b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0236ee52b
|
||
|
|
dcafdf1b20
|
||
|
|
65471fb9fc
|
||
|
|
196997f728
|
||
|
|
c24cef9d6d
|
||
|
|
ec93de9780
|
||
|
|
1e2b6d427f
|
||
|
|
f82cbfea79
|
||
|
|
f4717832f0
|
||
|
|
a7b80264d0
|
19
Dockerfile
Normal file
19
Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
FROM rust:1.85-slim-bookworm AS builder
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
COPY Cargo.toml Cargo.lock ./
|
||||||
|
COPY src/ src/
|
||||||
|
|
||||||
|
RUN cargo build --release
|
||||||
|
|
||||||
|
FROM debian:bookworm-slim
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y ca-certificates libssl3 && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
COPY --from=builder /app/target/release/owncast-irc-bridge /usr/local/bin/
|
||||||
|
COPY --from=builder /app/target/release/bridge-ctl /usr/local/bin/
|
||||||
|
|
||||||
|
ENTRYPOINT ["owncast-irc-bridge"]
|
||||||
|
CMD ["--config", "/etc/owncast-irc-bridge/config.toml"]
|
||||||
21
LICENSE
Normal file
21
LICENSE
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2026 cottongin
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
||||||
111
README.md
Normal file
111
README.md
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
> [!IMPORTANT]
|
||||||
|
> This project was developed entirely with AI coding assistance (Claude Opus 4.6 via Cursor IDE) and has not undergone rigorous human review. It is provided as-is and may require adjustments for other environments.
|
||||||
|
|
||||||
|
# owncast-irc-bridge
|
||||||
|
|
||||||
|
Bidirectional chat bridge between [Owncast](https://owncast.online/) and IRC. Messages sent in your Owncast chat appear in an IRC channel and vice versa.
|
||||||
|
|
||||||
|
## Quick Start (Docker Compose)
|
||||||
|
|
||||||
|
**1. Create your config file**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cp config.example.toml config.toml
|
||||||
|
```
|
||||||
|
|
||||||
|
Edit `config.toml` with your IRC server/channel and Owncast URL.
|
||||||
|
|
||||||
|
**2. Get an Owncast access token**
|
||||||
|
|
||||||
|
In your Owncast admin panel, go to **Integrations > Access Tokens** and create a token with "send messages" permission.
|
||||||
|
|
||||||
|
**3. Set the token**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
export OWNCAST_ACCESS_TOKEN="your-token-here"
|
||||||
|
```
|
||||||
|
|
||||||
|
Or create a `.env` file (git-ignored):
|
||||||
|
|
||||||
|
```
|
||||||
|
OWNCAST_ACCESS_TOKEN=your-token-here
|
||||||
|
```
|
||||||
|
|
||||||
|
**4. Configure the Owncast webhook**
|
||||||
|
|
||||||
|
In your Owncast admin, go to **Integrations > Webhooks** and add a webhook pointing to:
|
||||||
|
|
||||||
|
```
|
||||||
|
http://<bridge-host>:9078/webhook
|
||||||
|
```
|
||||||
|
|
||||||
|
Select the events: **Chat Message**, **Stream Started**, **Stream Stopped**.
|
||||||
|
|
||||||
|
**5. Run it**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker compose up -d
|
||||||
|
```
|
||||||
|
|
||||||
|
Check logs:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker compose logs -f
|
||||||
|
```
|
||||||
|
|
||||||
|
## Running Without Docker
|
||||||
|
|
||||||
|
Requires Rust 1.75+.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cargo build --release
|
||||||
|
export OWNCAST_ACCESS_TOKEN="your-token-here"
|
||||||
|
./target/release/owncast-irc-bridge --config config.toml
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
See [`config.example.toml`](config.example.toml) for all options. The only required sections are `[irc]` (with `server` and `channel`) and `[owncast]` (with `url`). Everything else has defaults.
|
||||||
|
|
||||||
|
| Section | Key | Default | Description |
|
||||||
|
|---------|-----|---------|-------------|
|
||||||
|
| `irc` | `server` | *(required)* | IRC server hostname |
|
||||||
|
| `irc` | `port` | `6667` | IRC server port |
|
||||||
|
| `irc` | `tls` | `false` | Use TLS for IRC |
|
||||||
|
| `irc` | `nick` | `owncast-bridge` | IRC nickname |
|
||||||
|
| `irc` | `channel` | *(required)* | IRC channel to join |
|
||||||
|
| `owncast` | `url` | *(required)* | Owncast instance URL |
|
||||||
|
| `owncast` | `webhook_port` | `9078` | Port the webhook server listens on |
|
||||||
|
| `owncast` | `websocket_enabled` | `false` | Also connect via WebSocket (redundant with webhook, useful as fallback) |
|
||||||
|
| `owncast` | `health_poll_interval_secs` | `30` | How often to poll Owncast status |
|
||||||
|
| `bridge` | `irc_prefix` | `[IRC]` | Prefix for IRC messages in Owncast |
|
||||||
|
| `bridge` | `owncast_prefix` | `[OC]` | Prefix for Owncast messages in IRC |
|
||||||
|
| `control` | `socket_path` | `/tmp/owncast-irc-bridge.sock` | Unix socket for `bridge-ctl` |
|
||||||
|
|
||||||
|
The access token is always read from the `OWNCAST_ACCESS_TOKEN` environment variable (not the config file).
|
||||||
|
|
||||||
|
## Runtime Control
|
||||||
|
|
||||||
|
Use `bridge-ctl` to interact with a running bridge:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
bridge-ctl status # Show bridge status as JSON
|
||||||
|
bridge-ctl irc reconnect # Reconnect to IRC
|
||||||
|
bridge-ctl owncast reconnect # Reconnect to Owncast
|
||||||
|
bridge-ctl quit # Shut down the bridge
|
||||||
|
```
|
||||||
|
|
||||||
|
Inside Docker:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker compose exec bridge bridge-ctl status
|
||||||
|
```
|
||||||
|
|
||||||
|
## How It Works
|
||||||
|
|
||||||
|
- **Owncast → IRC:** Owncast sends webhook events to the bridge. The bridge formats the message and sends it to IRC via PRIVMSG.
|
||||||
|
- **IRC → Owncast:** The bridge listens for PRIVMSG in the configured channel and posts to Owncast via the integration API.
|
||||||
|
- **Deduplication:** If both webhook and WebSocket are enabled, duplicate messages are detected by ID and dropped.
|
||||||
|
- **Echo suppression:** Messages the bridge itself sent are recognized and not re-bridged.
|
||||||
|
- **Stream events:** Stream start/stop events are announced in IRC.
|
||||||
|
- **Health polling:** The bridge polls Owncast's `/api/status` endpoint and announces state changes in IRC.
|
||||||
19
chat-summaries/2026-03-10_21-00-summary.md
Normal file
19
chat-summaries/2026-03-10_21-00-summary.md
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# Debug: IRC + WebSocket connection failures
|
||||||
|
|
||||||
|
## Task
|
||||||
|
Investigated why both IRC and WebSocket connections were failing immediately after connecting, entering infinite reconnection loops with exponential backoff.
|
||||||
|
|
||||||
|
## Findings
|
||||||
|
|
||||||
|
Both issues were **external/environmental**, not code bugs:
|
||||||
|
|
||||||
|
1. **IRC** (`irc.zeronode.net`): Server was rejecting with `ERROR :Closing link: ... [No more connections allowed from your host via this connect class (global)]`. Too many existing connections from the same host IP.
|
||||||
|
|
||||||
|
2. **WebSocket** (`wss://owncast.bowlafterbowl.com/ws`): Caddy proxy successfully upgraded (101), but the Owncast backend immediately reset the connection. Owncast instance was offline.
|
||||||
|
|
||||||
|
## Changes Made
|
||||||
|
None (instrumentation added for debugging was fully reverted after diagnosis).
|
||||||
|
|
||||||
|
## Follow-up Items
|
||||||
|
- Consider logging the actual IRC ERROR message content instead of the generic "IRC stream ended" — would make future diagnosis faster without instrumentation.
|
||||||
|
- Consider detecting fatal IRC errors (connection class limits, K-lines) and stopping reconnection attempts rather than continuing to hammer the server.
|
||||||
29
chat-summaries/2026-03-10_21-30-summary.md
Normal file
29
chat-summaries/2026-03-10_21-30-summary.md
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
# Fix IRC-Owncast Bridge Issues
|
||||||
|
|
||||||
|
## Task
|
||||||
|
Fix three issues: IRC username stripped in Owncast chat, WebSocket connection always failing, and echo suppression bug.
|
||||||
|
|
||||||
|
## Changes Made
|
||||||
|
|
||||||
|
### 1. HTML-escape username in IRC->Owncast messages (`src/router.rs`)
|
||||||
|
- Changed `<username>` to `<username>` in the formatted message sent to Owncast's chat API
|
||||||
|
- Owncast renders chat as HTML, so bare angle brackets were being interpreted as HTML tags and swallowed
|
||||||
|
|
||||||
|
### 2. Fix WebSocket authentication (`src/websocket.rs`, `src/owncast_api.rs`, `src/main.rs`)
|
||||||
|
- **Root cause**: Owncast's `/ws` endpoint requires an `accessToken` query parameter from a registered chat user. The bridge was connecting without one.
|
||||||
|
- Added `register_chat_user()` to `OwncastApiClient` -- calls `POST /api/chat/register` to get a user-level access token
|
||||||
|
- Updated `run_websocket_task` to register a chat user, cache the token, and pass it as `?accessToken=` in the WebSocket URL
|
||||||
|
- Token is cached across reconnections; re-registration happens only if rejected
|
||||||
|
- Added `base_url()` accessor to `OwncastApiClient`
|
||||||
|
|
||||||
|
### 3. Add `ws_display_name` config field (`src/config.rs`, `config.toml`, `config.example.toml`)
|
||||||
|
- New `ws_display_name` field on `OwncastConfig` (default: "IRC Bridge") controls the display name of the WebSocket chat user in Owncast
|
||||||
|
|
||||||
|
### 4. Fix echo suppression bug (`src/router.rs`)
|
||||||
|
- `record_sent` was storing the full formatted string (`[IRC] <nick> body`) but `is_echo` compared against raw `body` -- they never matched
|
||||||
|
- Changed `record_sent` to store the raw `body` so echo suppression actually works
|
||||||
|
- Moved the `is_echo` check before formatting in the Owncast->IRC path to avoid unnecessary work
|
||||||
|
|
||||||
|
## Follow-up Items
|
||||||
|
- Owncast admin still needs to configure webhook URL pointing to the bridge's port 9078
|
||||||
|
- The bridge's WebSocket user will appear as "IRC Bridge" in Owncast's connected users list
|
||||||
37
chat-summaries/2026-03-10_22-00-summary.md
Normal file
37
chat-summaries/2026-03-10_22-00-summary.md
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
# Owncast–IRC Bridge: Full Implementation
|
||||||
|
|
||||||
|
## Task Description
|
||||||
|
Executed the full 15-task implementation plan at `docs/plans/2026-03-10-owncast-irc-bridge-impl.md` using the executing-plans skill. Built a complete bidirectional Rust chat bridge between Owncast and IRC.
|
||||||
|
|
||||||
|
## Changes Made (15 commits)
|
||||||
|
1. **Project scaffolding** — `cargo init`, Cargo.toml with all deps, `.gitignore`, `config.example.toml`, placeholder binaries
|
||||||
|
2. **Config module** (`src/config.rs`) — TOML parsing with serde defaults, env var support for access token, 3 tests
|
||||||
|
3. **Events module** (`src/events.rs`) — `BridgeEvent`, `Source`, `OwncastState`, `ControlCommand`, `BridgeStatus` types
|
||||||
|
4. **HTML stripping** (`src/html.rs`) — Strips HTML tags, extracts emoji `<img>` alt text, decodes entities, 6 tests
|
||||||
|
5. **Owncast API client** (`src/owncast_api.rs`) — `send_chat_message` with retry, `get_status` for health checks
|
||||||
|
6. **Health poller** (`src/health.rs`) — Periodic Owncast status polling with state change detection
|
||||||
|
7. **Webhook server** (`src/webhook.rs`) — Axum HTTP server parsing CHAT/STREAM_STARTED/STREAM_STOPPED events, 5 tests
|
||||||
|
8. **IRC task** (`src/irc_task.rs`) — `irc` crate client with exponential backoff reconnect
|
||||||
|
9. **WebSocket task** (`src/websocket.rs`) — tokio-tungstenite client with reconnect, 5 tests
|
||||||
|
10. **Control socket** (`src/control.rs`) — Unix socket listener with command parsing, 5 tests
|
||||||
|
11. **Router** (`src/router.rs`) — Central orchestration with dedup tracker, echo suppressor, state handling, 4 tests
|
||||||
|
12. **Main entry point** (`src/main.rs`) — Wires all tasks together with signal handling (SIGINT/SIGTERM/SIGHUP)
|
||||||
|
13. **bridge-ctl CLI** (`src/bin/bridge_ctl.rs`) — Clap-based CLI for runtime control via Unix socket
|
||||||
|
14. **Dockerfile** — Multi-stage build (rust:1.85-slim-bookworm builder, debian:bookworm-slim runtime)
|
||||||
|
15. **Final fixes** — Added missing `Clone` derive on `IrcConfig`, cleaned up unused imports
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
- Both binaries (`owncast-irc-bridge`, `bridge-ctl`) compile
|
||||||
|
- All 28 tests pass across all modules
|
||||||
|
- Only benign warnings remain (unused struct fields for future API data)
|
||||||
|
|
||||||
|
## Bugs Found in Plan
|
||||||
|
- Raw string delimiters `r#"..."#` conflicted with TOML `"#channel"` values — fixed with `r##"..."##`
|
||||||
|
- `strip_html` had peek/consume bug (didn't advance past `<` and `&` before collecting) — fixed
|
||||||
|
- `reqwest::Response::text()` consumes `self`, so status must be captured first — fixed
|
||||||
|
|
||||||
|
## Follow-up Items
|
||||||
|
- Integration testing with actual Owncast and IRC instances
|
||||||
|
- Wire remaining control commands (connect/disconnect/reconnect) through to tasks
|
||||||
|
- Add `message_buffer_size` buffering logic
|
||||||
|
- SIGHUP config reload support
|
||||||
17
chat-summaries/2026-03-12_strip-irc-formatting-summary.md
Normal file
17
chat-summaries/2026-03-12_strip-irc-formatting-summary.md
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
# Strip IRC Formatting Codes from Owncast-bound Messages
|
||||||
|
|
||||||
|
**Date:** 2026-03-12
|
||||||
|
|
||||||
|
## Task
|
||||||
|
|
||||||
|
IRC messages forwarded to Owncast contained mIRC formatting control codes (`\x02`, `\x03`, `\x0F`, etc.) that rendered as garbage glyphs since Owncast doesn't support any message styling.
|
||||||
|
|
||||||
|
## Changes
|
||||||
|
|
||||||
|
- **New file: `src/irc_format.rs`** — `strip_formatting()` function that removes all IRC formatting control codes: bold, color (with fg/bg digit parameters), reset, monospace, reverse, italic, strikethrough, and underline. Includes 18 unit tests.
|
||||||
|
- **`src/irc_task.rs`** — Call `strip_formatting()` on the message body before constructing the `BridgeEvent`, so all downstream consumers see clean text.
|
||||||
|
- **`src/main.rs`** — Registered the new `irc_format` module.
|
||||||
|
|
||||||
|
## Follow-up
|
||||||
|
|
||||||
|
- None identified. No new dependencies added.
|
||||||
@@ -11,6 +11,7 @@ url = "https://owncast.bowlafterbowl.com"
|
|||||||
webhook_port = 9078
|
webhook_port = 9078
|
||||||
websocket_enabled = true
|
websocket_enabled = true
|
||||||
health_poll_interval_secs = 30
|
health_poll_interval_secs = 30
|
||||||
|
ws_display_name = "IRC Bridge"
|
||||||
|
|
||||||
[bridge]
|
[bridge]
|
||||||
irc_prefix = "[IRC]"
|
irc_prefix = "[IRC]"
|
||||||
|
|||||||
12
docker-compose.yml
Normal file
12
docker-compose.yml
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
services:
|
||||||
|
bridge:
|
||||||
|
build: .
|
||||||
|
container_name: owncast-irc-bridge
|
||||||
|
restart: unless-stopped
|
||||||
|
environment:
|
||||||
|
- OWNCAST_ACCESS_TOKEN=${OWNCAST_ACCESS_TOKEN}
|
||||||
|
- RUST_LOG=info
|
||||||
|
volumes:
|
||||||
|
- ./config.toml:/etc/owncast-irc-bridge/config.toml:ro
|
||||||
|
ports:
|
||||||
|
- "9078:9078"
|
||||||
@@ -1,3 +1,93 @@
|
|||||||
fn main() {
|
use std::io::{BufRead, BufReader, Write};
|
||||||
println!("bridge-ctl");
|
use std::os::unix::net::UnixStream;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
#[command(name = "bridge-ctl")]
|
||||||
|
#[command(about = "Control a running owncast-irc-bridge instance")]
|
||||||
|
struct Cli {
|
||||||
|
/// Path to the bridge control socket
|
||||||
|
#[arg(short, long, default_value = "/tmp/owncast-irc-bridge.sock")]
|
||||||
|
socket: PathBuf,
|
||||||
|
|
||||||
|
#[command(subcommand)]
|
||||||
|
command: Commands,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Subcommand)]
|
||||||
|
enum Commands {
|
||||||
|
/// Show bridge status
|
||||||
|
Status,
|
||||||
|
/// Control IRC connection
|
||||||
|
Irc {
|
||||||
|
#[command(subcommand)]
|
||||||
|
action: ConnectionAction,
|
||||||
|
},
|
||||||
|
/// Control Owncast connection
|
||||||
|
Owncast {
|
||||||
|
#[command(subcommand)]
|
||||||
|
action: ConnectionAction,
|
||||||
|
},
|
||||||
|
/// Shut down the bridge
|
||||||
|
Quit,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Subcommand)]
|
||||||
|
enum ConnectionAction {
|
||||||
|
Connect,
|
||||||
|
Disconnect,
|
||||||
|
Reconnect,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let cli = Cli::parse();
|
||||||
|
|
||||||
|
let command_str = match &cli.command {
|
||||||
|
Commands::Status => "status".to_string(),
|
||||||
|
Commands::Quit => "quit".to_string(),
|
||||||
|
Commands::Irc { action } => format!("irc {}", action_str(action)),
|
||||||
|
Commands::Owncast { action } => format!("owncast {}", action_str(action)),
|
||||||
|
};
|
||||||
|
|
||||||
|
match send_command(&cli.socket, &command_str) {
|
||||||
|
Ok(response) => print!("{response}"),
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Error: {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn action_str(action: &ConnectionAction) -> &'static str {
|
||||||
|
match action {
|
||||||
|
ConnectionAction::Connect => "connect",
|
||||||
|
ConnectionAction::Disconnect => "disconnect",
|
||||||
|
ConnectionAction::Reconnect => "reconnect",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_command(socket_path: &PathBuf, command: &str) -> Result<String, Box<dyn std::error::Error>> {
|
||||||
|
let mut stream = UnixStream::connect(socket_path)?;
|
||||||
|
stream.set_read_timeout(Some(Duration::from_secs(5)))?;
|
||||||
|
|
||||||
|
writeln!(stream, "{command}")?;
|
||||||
|
stream.flush()?;
|
||||||
|
|
||||||
|
let reader = BufReader::new(stream);
|
||||||
|
let mut response = String::new();
|
||||||
|
for line in reader.lines() {
|
||||||
|
match line {
|
||||||
|
Ok(l) => {
|
||||||
|
response.push_str(&l);
|
||||||
|
response.push('\n');
|
||||||
|
}
|
||||||
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
|
||||||
|
Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => break,
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ pub struct BridgeConfig {
|
|||||||
pub control: ControlConfig,
|
pub control: ControlConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
pub struct IrcConfig {
|
pub struct IrcConfig {
|
||||||
pub server: String,
|
pub server: String,
|
||||||
#[serde(default = "default_irc_port")]
|
#[serde(default = "default_irc_port")]
|
||||||
@@ -33,6 +33,8 @@ pub struct OwncastConfig {
|
|||||||
pub websocket_enabled: bool,
|
pub websocket_enabled: bool,
|
||||||
#[serde(default = "default_health_poll_interval")]
|
#[serde(default = "default_health_poll_interval")]
|
||||||
pub health_poll_interval_secs: u64,
|
pub health_poll_interval_secs: u64,
|
||||||
|
#[serde(default = "default_ws_display_name")]
|
||||||
|
pub ws_display_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
@@ -70,6 +72,7 @@ fn default_webhook_port() -> u16 { 9078 }
|
|||||||
fn default_health_poll_interval() -> u64 { 30 }
|
fn default_health_poll_interval() -> u64 { 30 }
|
||||||
fn default_irc_prefix() -> String { "[IRC]".to_string() }
|
fn default_irc_prefix() -> String { "[IRC]".to_string() }
|
||||||
fn default_owncast_prefix() -> String { "[OC]".to_string() }
|
fn default_owncast_prefix() -> String { "[OC]".to_string() }
|
||||||
|
fn default_ws_display_name() -> String { "IRC Bridge".to_string() }
|
||||||
fn default_socket_path() -> String { "/tmp/owncast-irc-bridge.sock".to_string() }
|
fn default_socket_path() -> String { "/tmp/owncast-irc-bridge.sock".to_string() }
|
||||||
|
|
||||||
impl Default for BridgeSettings {
|
impl Default for BridgeSettings {
|
||||||
@@ -104,6 +107,7 @@ impl BridgeConfig {
|
|||||||
webhook_port: 9078,
|
webhook_port: 9078,
|
||||||
websocket_enabled: false,
|
websocket_enabled: false,
|
||||||
health_poll_interval_secs: 30,
|
health_poll_interval_secs: 30,
|
||||||
|
ws_display_name: default_ws_display_name(),
|
||||||
},
|
},
|
||||||
bridge: BridgeSettings::default(),
|
bridge: BridgeSettings::default(),
|
||||||
control: ControlConfig::default(),
|
control: ControlConfig::default(),
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||||
use tokio::net::UnixListener;
|
use tokio::net::UnixListener;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tracing::{info, warn};
|
use tracing::info;
|
||||||
|
|
||||||
use crate::events::{BridgeStatus, ControlCommand};
|
use crate::events::ControlCommand;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
enum ParsedCommand {
|
enum ParsedCommand {
|
||||||
|
|||||||
159
src/irc_format.rs
Normal file
159
src/irc_format.rs
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
/// Strip mIRC-style formatting control codes from a string.
|
||||||
|
///
|
||||||
|
/// Removes bold (\x02), color (\x03 + optional fg[,bg] digits), reset (\x0F),
|
||||||
|
/// monospace (\x11), reverse (\x16), italic (\x1D), strikethrough (\x1E),
|
||||||
|
/// and underline (\x1F).
|
||||||
|
pub fn strip_formatting(input: &str) -> String {
|
||||||
|
let bytes = input.as_bytes();
|
||||||
|
let len = bytes.len();
|
||||||
|
let mut out: Vec<u8> = Vec::with_capacity(len);
|
||||||
|
let mut i = 0;
|
||||||
|
|
||||||
|
while i < len {
|
||||||
|
match bytes[i] {
|
||||||
|
b'\x02' | b'\x0F' | b'\x11' | b'\x16' | b'\x1D' | b'\x1E' | b'\x1F' => {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
b'\x03' => {
|
||||||
|
i += 1;
|
||||||
|
let mut digits = 0;
|
||||||
|
while i < len && digits < 2 && bytes[i].is_ascii_digit() {
|
||||||
|
i += 1;
|
||||||
|
digits += 1;
|
||||||
|
}
|
||||||
|
if i < len && bytes[i] == b',' && i + 1 < len && bytes[i + 1].is_ascii_digit() {
|
||||||
|
i += 1;
|
||||||
|
digits = 0;
|
||||||
|
while i < len && digits < 2 && bytes[i].is_ascii_digit() {
|
||||||
|
i += 1;
|
||||||
|
digits += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b => {
|
||||||
|
out.push(b);
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IRC control codes are single-byte ASCII (< 0x80) so removing them from
|
||||||
|
// valid UTF-8 always yields valid UTF-8.
|
||||||
|
String::from_utf8(out).expect("stripping ASCII control codes preserves UTF-8")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn clean_text_unchanged() {
|
||||||
|
assert_eq!(strip_formatting("hello world"), "hello world");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_bold() {
|
||||||
|
assert_eq!(strip_formatting("\x02bold\x02"), "bold");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_italic() {
|
||||||
|
assert_eq!(strip_formatting("\x1Ditalic\x1D"), "italic");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_underline() {
|
||||||
|
assert_eq!(strip_formatting("\x1Funderline\x1F"), "underline");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_reset() {
|
||||||
|
assert_eq!(strip_formatting("styled\x0F plain"), "styled plain");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_color_no_params() {
|
||||||
|
assert_eq!(strip_formatting("\x03hello"), "hello");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_color_fg_only() {
|
||||||
|
assert_eq!(strip_formatting("\x034red text"), "red text");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_color_two_digit_fg() {
|
||||||
|
assert_eq!(strip_formatting("\x0312blue text"), "blue text");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_color_fg_and_bg() {
|
||||||
|
assert_eq!(strip_formatting("\x034,2red on blue"), "red on blue");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_color_two_digit_fg_and_bg() {
|
||||||
|
assert_eq!(strip_formatting("\x0304,12colored"), "colored");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn color_comma_without_bg_digit_preserves_comma() {
|
||||||
|
// \x03 followed by digit then comma but no bg digit — comma is kept
|
||||||
|
assert_eq!(strip_formatting("\x034,text"), ",text");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn mixed_codes() {
|
||||||
|
assert_eq!(
|
||||||
|
strip_formatting("\x02\x034,5bold color\x0F normal"),
|
||||||
|
"bold color normal"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn color_at_end_of_string() {
|
||||||
|
assert_eq!(strip_formatting("text\x03"), "text");
|
||||||
|
assert_eq!(strip_formatting("text\x034"), "text");
|
||||||
|
assert_eq!(strip_formatting("text\x0304,"), "text,");
|
||||||
|
assert_eq!(strip_formatting("text\x0304,1"), "text");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_monospace() {
|
||||||
|
assert_eq!(strip_formatting("\x11code\x11"), "code");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_reverse() {
|
||||||
|
assert_eq!(strip_formatting("\x16reversed\x16"), "reversed");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_strikethrough() {
|
||||||
|
assert_eq!(strip_formatting("\x1Estruck\x1E"), "struck");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn empty_input() {
|
||||||
|
assert_eq!(strip_formatting(""), "");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn preserves_multibyte_utf8() {
|
||||||
|
assert_eq!(strip_formatting("✨ hello ⚡"), "✨ hello ⚡");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn strips_codes_around_emoji() {
|
||||||
|
// \x034 = color fg 4; \x0333 = color fg 33, leaving trailing "3"
|
||||||
|
assert_eq!(
|
||||||
|
strip_formatting("\x02✨\x02 boosted \x034⚡\x03333 sats"),
|
||||||
|
"✨ boosted ⚡3 sats"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn preserves_cjk_and_accented_chars() {
|
||||||
|
assert_eq!(strip_formatting("\x02café\x02 日本語"), "café 日本語");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,7 +3,7 @@ use std::time::Duration;
|
|||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use irc::client::prelude::*;
|
use irc::client::prelude::*;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info};
|
||||||
|
|
||||||
use crate::config::IrcConfig;
|
use crate::config::IrcConfig;
|
||||||
use crate::events::{BridgeEvent, Source};
|
use crate::events::{BridgeEvent, Source};
|
||||||
@@ -74,7 +74,7 @@ async fn connect_and_run(
|
|||||||
let event = BridgeEvent::ChatMessage {
|
let event = BridgeEvent::ChatMessage {
|
||||||
source: Source::Irc,
|
source: Source::Irc,
|
||||||
username: nick,
|
username: nick,
|
||||||
body: text.clone(),
|
body: crate::irc_format::strip_formatting(text),
|
||||||
id: None,
|
id: None,
|
||||||
};
|
};
|
||||||
if event_tx.send(event).await.is_err() {
|
if event_tx.send(event).await.is_err() {
|
||||||
|
|||||||
132
src/main.rs
132
src/main.rs
@@ -3,11 +3,139 @@ mod control;
|
|||||||
mod events;
|
mod events;
|
||||||
mod health;
|
mod health;
|
||||||
mod html;
|
mod html;
|
||||||
|
mod irc_format;
|
||||||
mod irc_task;
|
mod irc_task;
|
||||||
mod owncast_api;
|
mod owncast_api;
|
||||||
|
mod router;
|
||||||
mod webhook;
|
mod webhook;
|
||||||
mod websocket;
|
mod websocket;
|
||||||
|
|
||||||
fn main() {
|
use std::path::PathBuf;
|
||||||
println!("owncast-irc-bridge");
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
#[command(name = "owncast-irc-bridge")]
|
||||||
|
#[command(about = "Bidirectional chat bridge between Owncast and IRC")]
|
||||||
|
struct Cli {
|
||||||
|
/// Path to config file
|
||||||
|
#[arg(short, long, default_value = "config.toml")]
|
||||||
|
config: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(
|
||||||
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||||
|
)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let cli = Cli::parse();
|
||||||
|
let config = config::BridgeConfig::load(&cli.config)?;
|
||||||
|
let access_token = config.owncast_access_token()?;
|
||||||
|
|
||||||
|
info!("Starting owncast-irc-bridge");
|
||||||
|
|
||||||
|
let start_time = Instant::now();
|
||||||
|
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||||
|
|
||||||
|
let (event_tx, event_rx) = mpsc::channel(256);
|
||||||
|
let (irc_outbound_tx, irc_outbound_rx) = mpsc::channel(256);
|
||||||
|
let (state_tx, state_rx) = mpsc::channel(32);
|
||||||
|
let (control_tx, control_rx) = mpsc::channel(32);
|
||||||
|
|
||||||
|
let api_client = owncast_api::OwncastApiClient::new(
|
||||||
|
config.owncast.url.clone(),
|
||||||
|
access_token,
|
||||||
|
);
|
||||||
|
|
||||||
|
let irc_config = config.irc.clone();
|
||||||
|
let irc_event_tx = event_tx.clone();
|
||||||
|
let irc_shutdown = shutdown_rx.clone();
|
||||||
|
let _irc_handle = tokio::spawn(async move {
|
||||||
|
irc_task::run_irc_task(irc_config, irc_event_tx, irc_outbound_rx, irc_shutdown).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let webhook_port = config.owncast.webhook_port;
|
||||||
|
let webhook_event_tx = event_tx.clone();
|
||||||
|
let _webhook_handle = tokio::spawn(async move {
|
||||||
|
if let Err(e) = webhook::run_webhook_server(webhook_port, webhook_event_tx).await {
|
||||||
|
tracing::error!(error = %e, "Webhook server failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ws_handle = if config.owncast.websocket_enabled {
|
||||||
|
let ws_api = owncast_api::OwncastApiClient::new(
|
||||||
|
config.owncast.url.clone(),
|
||||||
|
String::new(),
|
||||||
|
);
|
||||||
|
let ws_display_name = config.owncast.ws_display_name.clone();
|
||||||
|
let ws_event_tx = event_tx.clone();
|
||||||
|
let ws_shutdown = shutdown_rx.clone();
|
||||||
|
Some(tokio::spawn(async move {
|
||||||
|
websocket::run_websocket_task(ws_api, ws_display_name, ws_event_tx, ws_shutdown).await;
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let health_api = owncast_api::OwncastApiClient::new(
|
||||||
|
config.owncast.url.clone(),
|
||||||
|
String::new(),
|
||||||
|
);
|
||||||
|
let health_interval = std::time::Duration::from_secs(config.owncast.health_poll_interval_secs);
|
||||||
|
let health_shutdown = shutdown_rx.clone();
|
||||||
|
let _health_handle = tokio::spawn(async move {
|
||||||
|
health::run_health_poller(&health_api, state_tx, health_interval, health_shutdown).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let control_socket_path = config.control.socket_path.clone();
|
||||||
|
let _control_handle = tokio::spawn(async move {
|
||||||
|
if let Err(e) = control::run_control_socket(&control_socket_path, control_tx).await {
|
||||||
|
tracing::error!(error = %e, "Control socket failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let sig_shutdown_tx = shutdown_tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||||
|
.expect("Failed to register SIGTERM handler");
|
||||||
|
let mut sighup = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
|
||||||
|
.expect("Failed to register SIGHUP handler");
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = tokio::signal::ctrl_c() => {
|
||||||
|
info!("SIGINT received, shutting down");
|
||||||
|
let _ = sig_shutdown_tx.send(true);
|
||||||
|
}
|
||||||
|
_ = sigterm.recv() => {
|
||||||
|
info!("SIGTERM received, shutting down");
|
||||||
|
let _ = sig_shutdown_tx.send(true);
|
||||||
|
}
|
||||||
|
_ = sighup.recv() => {
|
||||||
|
info!("SIGHUP received (reconnect not yet wired)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
router::run_router(
|
||||||
|
config.bridge,
|
||||||
|
config.owncast.url,
|
||||||
|
api_client,
|
||||||
|
event_rx,
|
||||||
|
irc_outbound_tx,
|
||||||
|
state_rx,
|
||||||
|
control_rx,
|
||||||
|
shutdown_tx,
|
||||||
|
start_time,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
info!("Bridge shutting down");
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,12 +58,44 @@ impl OwncastApiClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn register_chat_user(&self, display_name: &str) -> anyhow::Result<ChatRegistration> {
|
||||||
|
let url = format!("{}/api/chat/register", self.base_url);
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.post(&url)
|
||||||
|
.json(&serde_json::json!({ "displayName": display_name }))
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let status = resp.status();
|
||||||
|
if !status.is_success() {
|
||||||
|
let body = resp.text().await.unwrap_or_default();
|
||||||
|
anyhow::bail!("Chat registration failed ({status}): {body}");
|
||||||
|
}
|
||||||
|
|
||||||
|
let reg: ChatRegistration = resp.json().await?;
|
||||||
|
Ok(reg)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_status(&self) -> anyhow::Result<OwncastStatus> {
|
pub async fn get_status(&self) -> anyhow::Result<OwncastStatus> {
|
||||||
let url = format!("{}/api/status", self.base_url);
|
let url = format!("{}/api/status", self.base_url);
|
||||||
let resp = self.client.get(&url).send().await?;
|
let resp = self.client.get(&url).send().await?;
|
||||||
let status: OwncastStatus = resp.json().await?;
|
let status: OwncastStatus = resp.json().await?;
|
||||||
Ok(status)
|
Ok(status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn base_url(&self) -> &str {
|
||||||
|
&self.base_url
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize)]
|
||||||
|
pub struct ChatRegistration {
|
||||||
|
pub id: String,
|
||||||
|
#[serde(rename = "accessToken")]
|
||||||
|
pub access_token: String,
|
||||||
|
#[serde(rename = "displayName")]
|
||||||
|
pub display_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
|||||||
236
src/router.rs
Normal file
236
src/router.rs
Normal file
@@ -0,0 +1,236 @@
|
|||||||
|
use std::collections::{HashSet, VecDeque};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
use crate::config::BridgeSettings;
|
||||||
|
use crate::events::{BridgeEvent, ControlCommand, OwncastState, Source};
|
||||||
|
use crate::owncast_api::OwncastApiClient;
|
||||||
|
|
||||||
|
pub struct DedupTracker {
|
||||||
|
seen: VecDeque<String>,
|
||||||
|
set: HashSet<String>,
|
||||||
|
capacity: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DedupTracker {
|
||||||
|
pub fn new(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
seen: VecDeque::with_capacity(capacity),
|
||||||
|
set: HashSet::with_capacity(capacity),
|
||||||
|
capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_duplicate(&mut self, id: &str) -> bool {
|
||||||
|
if self.set.contains(id) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if self.seen.len() >= self.capacity {
|
||||||
|
if let Some(old) = self.seen.pop_front() {
|
||||||
|
self.set.remove(&old);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.set.insert(id.to_string());
|
||||||
|
self.seen.push_back(id.to_string());
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EchoSuppressor {
|
||||||
|
recent: VecDeque<String>,
|
||||||
|
capacity: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EchoSuppressor {
|
||||||
|
pub fn new(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
recent: VecDeque::with_capacity(capacity),
|
||||||
|
capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn record_sent(&mut self, body: &str) {
|
||||||
|
if self.recent.len() >= self.capacity {
|
||||||
|
self.recent.pop_front();
|
||||||
|
}
|
||||||
|
self.recent.push_back(body.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_echo(&mut self, body: &str) -> bool {
|
||||||
|
if let Some(pos) = self.recent.iter().position(|s| s == body) {
|
||||||
|
self.recent.remove(pos);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run_router(
|
||||||
|
settings: BridgeSettings,
|
||||||
|
owncast_url: String,
|
||||||
|
api_client: OwncastApiClient,
|
||||||
|
mut event_rx: mpsc::Receiver<BridgeEvent>,
|
||||||
|
irc_outbound_tx: mpsc::Sender<String>,
|
||||||
|
mut state_rx: mpsc::Receiver<OwncastState>,
|
||||||
|
mut control_rx: mpsc::Receiver<ControlCommand>,
|
||||||
|
shutdown_tx: tokio::sync::watch::Sender<bool>,
|
||||||
|
start_time: Instant,
|
||||||
|
) {
|
||||||
|
let mut dedup = DedupTracker::new(500);
|
||||||
|
let mut echo_suppressor = EchoSuppressor::new(50);
|
||||||
|
let mut owncast_state = OwncastState::Unavailable;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(event) = event_rx.recv() => {
|
||||||
|
handle_event(
|
||||||
|
event,
|
||||||
|
&settings,
|
||||||
|
&owncast_url,
|
||||||
|
&api_client,
|
||||||
|
&irc_outbound_tx,
|
||||||
|
&mut dedup,
|
||||||
|
&mut echo_suppressor,
|
||||||
|
&owncast_state,
|
||||||
|
).await;
|
||||||
|
}
|
||||||
|
Some(new_state) = state_rx.recv() => {
|
||||||
|
if new_state != owncast_state {
|
||||||
|
handle_state_change(&new_state, &owncast_state, &irc_outbound_tx).await;
|
||||||
|
owncast_state = new_state;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(cmd) = control_rx.recv() => {
|
||||||
|
match cmd {
|
||||||
|
ControlCommand::Quit => {
|
||||||
|
info!("Quit command received, shutting down");
|
||||||
|
let _ = shutdown_tx.send(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ControlCommand::Status { reply } => {
|
||||||
|
let status = crate::events::BridgeStatus {
|
||||||
|
irc_connected: true,
|
||||||
|
owncast_state: format!("{:?}", owncast_state),
|
||||||
|
webhook_listening: true,
|
||||||
|
websocket_connected: false,
|
||||||
|
uptime_secs: start_time.elapsed().as_secs(),
|
||||||
|
};
|
||||||
|
let _ = reply.send(status);
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
warn!(command = ?other, "Control command not yet implemented in router");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_event(
|
||||||
|
event: BridgeEvent,
|
||||||
|
settings: &BridgeSettings,
|
||||||
|
owncast_url: &str,
|
||||||
|
api_client: &OwncastApiClient,
|
||||||
|
irc_tx: &mpsc::Sender<String>,
|
||||||
|
dedup: &mut DedupTracker,
|
||||||
|
echo: &mut EchoSuppressor,
|
||||||
|
owncast_state: &OwncastState,
|
||||||
|
) {
|
||||||
|
match event {
|
||||||
|
BridgeEvent::ChatMessage { source, username, body, id } => {
|
||||||
|
if let Some(ref msg_id) = id {
|
||||||
|
if dedup.is_duplicate(msg_id) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match source {
|
||||||
|
Source::Irc => {
|
||||||
|
if *owncast_state == OwncastState::Unavailable {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let formatted = format!("{} <{}> {}", settings.irc_prefix, username, body);
|
||||||
|
echo.record_sent(&body);
|
||||||
|
if let Err(e) = api_client.send_chat_message(&formatted).await {
|
||||||
|
warn!(error = %e, "Failed to send to Owncast");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Source::Owncast => {
|
||||||
|
if echo.is_echo(&body) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let formatted = format!("{} <{}> {}", settings.owncast_prefix, username, body);
|
||||||
|
if irc_tx.send(formatted).await.is_err() {
|
||||||
|
warn!("IRC outbound channel closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BridgeEvent::StreamStarted { title } => {
|
||||||
|
let msg = if title.is_empty() {
|
||||||
|
format!("Stream started — {}", owncast_url)
|
||||||
|
} else {
|
||||||
|
format!("Stream started: {} — {}", title, owncast_url)
|
||||||
|
};
|
||||||
|
let _ = irc_tx.send(msg).await;
|
||||||
|
}
|
||||||
|
BridgeEvent::StreamStopped => {
|
||||||
|
let _ = irc_tx.send("Stream ended.".to_string()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_state_change(
|
||||||
|
new: &OwncastState,
|
||||||
|
_old: &OwncastState,
|
||||||
|
irc_tx: &mpsc::Sender<String>,
|
||||||
|
) {
|
||||||
|
match new {
|
||||||
|
OwncastState::Online => {
|
||||||
|
let _ = irc_tx.send("Owncast chat is now available.".to_string()).await;
|
||||||
|
}
|
||||||
|
OwncastState::Unavailable => {
|
||||||
|
let _ = irc_tx.send("Owncast chat is currently unavailable.".to_string()).await;
|
||||||
|
}
|
||||||
|
OwncastState::OfflineChatOpen => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_dedup_tracker_new_id() {
|
||||||
|
let mut tracker = DedupTracker::new(100);
|
||||||
|
assert!(!tracker.is_duplicate("msg-1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_dedup_tracker_duplicate() {
|
||||||
|
let mut tracker = DedupTracker::new(100);
|
||||||
|
assert!(!tracker.is_duplicate("msg-1"));
|
||||||
|
assert!(tracker.is_duplicate("msg-1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_dedup_tracker_evicts_old() {
|
||||||
|
let mut tracker = DedupTracker::new(2);
|
||||||
|
tracker.is_duplicate("a");
|
||||||
|
tracker.is_duplicate("b");
|
||||||
|
tracker.is_duplicate("c");
|
||||||
|
assert!(!tracker.is_duplicate("a"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_echo_suppressor() {
|
||||||
|
let mut suppressor = EchoSuppressor::new(10);
|
||||||
|
suppressor.record_sent("hello from IRC");
|
||||||
|
assert!(suppressor.is_echo("hello from IRC"));
|
||||||
|
assert!(!suppressor.is_echo("different message"));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,17 +7,33 @@ use tracing::{info, warn};
|
|||||||
|
|
||||||
use crate::events::{BridgeEvent, Source};
|
use crate::events::{BridgeEvent, Source};
|
||||||
use crate::html::strip_html;
|
use crate::html::strip_html;
|
||||||
|
use crate::owncast_api::OwncastApiClient;
|
||||||
|
|
||||||
pub async fn run_websocket_task(
|
pub async fn run_websocket_task(
|
||||||
owncast_url: String,
|
api_client: OwncastApiClient,
|
||||||
|
display_name: String,
|
||||||
event_tx: mpsc::Sender<BridgeEvent>,
|
event_tx: mpsc::Sender<BridgeEvent>,
|
||||||
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
||||||
) {
|
) {
|
||||||
let mut backoff = Duration::from_secs(1);
|
let mut backoff = Duration::from_secs(1);
|
||||||
let max_backoff = Duration::from_secs(60);
|
let max_backoff = Duration::from_secs(60);
|
||||||
|
let mut cached_token: Option<String> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let ws_url = build_ws_url(&owncast_url);
|
let token = match obtain_token(&api_client, &display_name, &mut cached_token).await {
|
||||||
|
Some(t) => t,
|
||||||
|
None => {
|
||||||
|
warn!("Failed to register chat user for WebSocket, retrying after backoff");
|
||||||
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(backoff) => {},
|
||||||
|
_ = shutdown.changed() => return,
|
||||||
|
}
|
||||||
|
backoff = (backoff * 2).min(max_backoff);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let ws_url = build_ws_url(api_client.base_url(), &token);
|
||||||
info!(url = %ws_url, "Connecting to Owncast WebSocket");
|
info!(url = %ws_url, "Connecting to Owncast WebSocket");
|
||||||
|
|
||||||
match connect_and_listen(&ws_url, &event_tx, &mut shutdown).await {
|
match connect_and_listen(&ws_url, &event_tx, &mut shutdown).await {
|
||||||
@@ -27,6 +43,13 @@ pub async fn run_websocket_task(
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(error = %e, "WebSocket connection error");
|
warn!(error = %e, "WebSocket connection error");
|
||||||
|
|
||||||
|
let err_str = e.to_string();
|
||||||
|
if err_str.contains("403") || err_str.contains("NeedsRegistration") {
|
||||||
|
info!("Token rejected, will re-register");
|
||||||
|
cached_token = None;
|
||||||
|
}
|
||||||
|
|
||||||
info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff");
|
info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff");
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -40,14 +63,37 @@ pub async fn run_websocket_task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_ws_url(base_url: &str) -> String {
|
async fn obtain_token(
|
||||||
|
api_client: &OwncastApiClient,
|
||||||
|
display_name: &str,
|
||||||
|
cached: &mut Option<String>,
|
||||||
|
) -> Option<String> {
|
||||||
|
if let Some(ref token) = cached {
|
||||||
|
return Some(token.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
match api_client.register_chat_user(display_name).await {
|
||||||
|
Ok(reg) => {
|
||||||
|
info!(user_id = %reg.id, name = %reg.display_name, "Registered WebSocket chat user");
|
||||||
|
let token = reg.access_token.clone();
|
||||||
|
*cached = Some(token.clone());
|
||||||
|
Some(token)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = %e, "Chat user registration failed");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_ws_url(base_url: &str, access_token: &str) -> String {
|
||||||
let base = base_url.trim_end_matches('/');
|
let base = base_url.trim_end_matches('/');
|
||||||
let ws_base = if base.starts_with("https://") {
|
let ws_base = if base.starts_with("https://") {
|
||||||
base.replacen("https://", "wss://", 1)
|
base.replacen("https://", "wss://", 1)
|
||||||
} else {
|
} else {
|
||||||
base.replacen("http://", "ws://", 1)
|
base.replacen("http://", "ws://", 1)
|
||||||
};
|
};
|
||||||
format!("{}/ws", ws_base)
|
format!("{}/ws?accessToken={}", ws_base, access_token)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connect_and_listen(
|
async fn connect_and_listen(
|
||||||
@@ -115,24 +161,24 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_build_ws_url_https() {
|
fn test_build_ws_url_https() {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
build_ws_url("https://owncast.example.com"),
|
build_ws_url("https://owncast.example.com", "tok123"),
|
||||||
"wss://owncast.example.com/ws"
|
"wss://owncast.example.com/ws?accessToken=tok123"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_build_ws_url_http() {
|
fn test_build_ws_url_http() {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
build_ws_url("http://localhost:8080"),
|
build_ws_url("http://localhost:8080", "tok123"),
|
||||||
"ws://localhost:8080/ws"
|
"ws://localhost:8080/ws?accessToken=tok123"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_build_ws_url_trailing_slash() {
|
fn test_build_ws_url_trailing_slash() {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
build_ws_url("https://owncast.example.com/"),
|
build_ws_url("https://owncast.example.com/", "tok123"),
|
||||||
"wss://owncast.example.com/ws"
|
"wss://owncast.example.com/ws?accessToken=tok123"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user