Files
owncast-IRC-bridge/src/websocket.rs

202 lines
6.2 KiB
Rust
Raw Normal View History

use std::time::Duration;
use futures_util::StreamExt;
use tokio::sync::mpsc;
use tokio_tungstenite::connect_async;
use tracing::{info, warn};
use crate::events::{BridgeEvent, Source};
use crate::html::strip_html;
use crate::owncast_api::OwncastApiClient;
pub async fn run_websocket_task(
api_client: OwncastApiClient,
display_name: String,
event_tx: mpsc::Sender<BridgeEvent>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) {
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(60);
let mut cached_token: Option<String> = None;
loop {
let token = match obtain_token(&api_client, &display_name, &mut cached_token).await {
Some(t) => t,
None => {
warn!("Failed to register chat user for WebSocket, retrying after backoff");
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = shutdown.changed() => return,
}
backoff = (backoff * 2).min(max_backoff);
continue;
}
};
let ws_url = build_ws_url(api_client.base_url(), &token);
info!(url = %ws_url, "Connecting to Owncast WebSocket");
match connect_and_listen(&ws_url, &event_tx, &mut shutdown).await {
Ok(()) => {
info!("WebSocket task exiting cleanly");
return;
}
Err(e) => {
warn!(error = %e, "WebSocket connection error");
let err_str = e.to_string();
if err_str.contains("403") || err_str.contains("NeedsRegistration") {
info!("Token rejected, will re-register");
cached_token = None;
}
info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff");
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = shutdown.changed() => return,
}
backoff = (backoff * 2).min(max_backoff);
}
}
}
}
async fn obtain_token(
api_client: &OwncastApiClient,
display_name: &str,
cached: &mut Option<String>,
) -> Option<String> {
if let Some(ref token) = cached {
return Some(token.clone());
}
match api_client.register_chat_user(display_name).await {
Ok(reg) => {
info!(user_id = %reg.id, name = %reg.display_name, "Registered WebSocket chat user");
let token = reg.access_token.clone();
*cached = Some(token.clone());
Some(token)
}
Err(e) => {
warn!(error = %e, "Chat user registration failed");
None
}
}
}
fn build_ws_url(base_url: &str, access_token: &str) -> String {
let base = base_url.trim_end_matches('/');
let ws_base = if base.starts_with("https://") {
base.replacen("https://", "wss://", 1)
} else {
base.replacen("http://", "ws://", 1)
};
format!("{}/ws?accessToken={}", ws_base, access_token)
}
async fn connect_and_listen(
ws_url: &str,
event_tx: &mpsc::Sender<BridgeEvent>,
shutdown: &mut tokio::sync::watch::Receiver<bool>,
) -> anyhow::Result<()> {
let (ws_stream, _) = connect_async(ws_url).await?;
let (_write, mut read) = ws_stream.split();
info!("WebSocket connected");
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(ws_msg)) => {
if let Ok(text) = ws_msg.into_text() {
if let Some(event) = parse_ws_message(&text) {
if event_tx.send(event).await.is_err() {
return Ok(());
}
}
}
}
Some(Err(e)) => return Err(e.into()),
None => return Err(anyhow::anyhow!("WebSocket stream ended")),
}
}
_ = shutdown.changed() => return Ok(()),
}
}
}
fn parse_ws_message(text: &str) -> Option<BridgeEvent> {
let value: serde_json::Value = serde_json::from_str(text).ok()?;
let msg_type = value.get("type")?.as_str()?;
match msg_type {
"CHAT" => {
let user = value.get("user")?;
let is_bot = user.get("isBot").and_then(|v| v.as_bool()).unwrap_or(false);
if is_bot {
return None;
}
let display_name = user.get("displayName")?.as_str()?.to_string();
let body = value.get("body")?.as_str()?;
let id = value.get("id").and_then(|v| v.as_str()).map(String::from);
Some(BridgeEvent::ChatMessage {
source: Source::Owncast,
username: display_name,
body: strip_html(body),
id,
})
}
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_ws_url_https() {
assert_eq!(
build_ws_url("https://owncast.example.com", "tok123"),
"wss://owncast.example.com/ws?accessToken=tok123"
);
}
#[test]
fn test_build_ws_url_http() {
assert_eq!(
build_ws_url("http://localhost:8080", "tok123"),
"ws://localhost:8080/ws?accessToken=tok123"
);
}
#[test]
fn test_build_ws_url_trailing_slash() {
assert_eq!(
build_ws_url("https://owncast.example.com/", "tok123"),
"wss://owncast.example.com/ws?accessToken=tok123"
);
}
#[test]
fn test_parse_ws_chat_message() {
let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"viewer","isBot":false}}"#;
let event = parse_ws_message(json);
assert!(matches!(
event,
Some(BridgeEvent::ChatMessage { ref username, ref body, .. })
if username == "viewer" && body == "hello"
));
}
#[test]
fn test_parse_ws_bot_message_ignored() {
let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"bot","isBot":true}}"#;
assert!(parse_ws_message(json).is_none());
}
}