use std::time::Duration; use futures_util::StreamExt; use tokio::sync::mpsc; use tokio_tungstenite::connect_async; use tracing::{info, warn}; use crate::events::{BridgeEvent, Source}; use crate::html::strip_html; use crate::owncast_api::OwncastApiClient; pub async fn run_websocket_task( api_client: OwncastApiClient, display_name: String, event_tx: mpsc::Sender, mut shutdown: tokio::sync::watch::Receiver, ) { let mut backoff = Duration::from_secs(1); let max_backoff = Duration::from_secs(60); let mut cached_token: Option = None; loop { let token = match obtain_token(&api_client, &display_name, &mut cached_token).await { Some(t) => t, None => { warn!("Failed to register chat user for WebSocket, retrying after backoff"); tokio::select! { _ = tokio::time::sleep(backoff) => {}, _ = shutdown.changed() => return, } backoff = (backoff * 2).min(max_backoff); continue; } }; let ws_url = build_ws_url(api_client.base_url(), &token); info!(url = %ws_url, "Connecting to Owncast WebSocket"); match connect_and_listen(&ws_url, &event_tx, &mut shutdown).await { Ok(()) => { info!("WebSocket task exiting cleanly"); return; } Err(e) => { warn!(error = %e, "WebSocket connection error"); let err_str = e.to_string(); if err_str.contains("403") || err_str.contains("NeedsRegistration") { info!("Token rejected, will re-register"); cached_token = None; } info!(backoff_secs = backoff.as_secs(), "Reconnecting after backoff"); tokio::select! { _ = tokio::time::sleep(backoff) => {}, _ = shutdown.changed() => return, } backoff = (backoff * 2).min(max_backoff); } } } } async fn obtain_token( api_client: &OwncastApiClient, display_name: &str, cached: &mut Option, ) -> Option { if let Some(ref token) = cached { return Some(token.clone()); } match api_client.register_chat_user(display_name).await { Ok(reg) => { info!(user_id = %reg.id, name = %reg.display_name, "Registered WebSocket chat user"); let token = reg.access_token.clone(); *cached = Some(token.clone()); Some(token) } Err(e) => { warn!(error = %e, "Chat user registration failed"); None } } } fn build_ws_url(base_url: &str, access_token: &str) -> String { let base = base_url.trim_end_matches('/'); let ws_base = if base.starts_with("https://") { base.replacen("https://", "wss://", 1) } else { base.replacen("http://", "ws://", 1) }; format!("{}/ws?accessToken={}", ws_base, access_token) } async fn connect_and_listen( ws_url: &str, event_tx: &mpsc::Sender, shutdown: &mut tokio::sync::watch::Receiver, ) -> anyhow::Result<()> { let (ws_stream, _) = connect_async(ws_url).await?; let (_write, mut read) = ws_stream.split(); info!("WebSocket connected"); loop { tokio::select! { msg = read.next() => { match msg { Some(Ok(ws_msg)) => { if let Ok(text) = ws_msg.into_text() { if let Some(event) = parse_ws_message(&text) { if event_tx.send(event).await.is_err() { return Ok(()); } } } } Some(Err(e)) => return Err(e.into()), None => return Err(anyhow::anyhow!("WebSocket stream ended")), } } _ = shutdown.changed() => return Ok(()), } } } fn parse_ws_message(text: &str) -> Option { let value: serde_json::Value = serde_json::from_str(text).ok()?; let msg_type = value.get("type")?.as_str()?; match msg_type { "CHAT" => { let user = value.get("user")?; let is_bot = user.get("isBot").and_then(|v| v.as_bool()).unwrap_or(false); if is_bot { return None; } let display_name = user.get("displayName")?.as_str()?.to_string(); let body = value.get("body")?.as_str()?; let id = value.get("id").and_then(|v| v.as_str()).map(String::from); Some(BridgeEvent::ChatMessage { source: Source::Owncast, username: display_name, body: strip_html(body), id, is_action: false, }) } _ => None, } } #[cfg(test)] mod tests { use super::*; #[test] fn test_build_ws_url_https() { assert_eq!( build_ws_url("https://owncast.example.com", "tok123"), "wss://owncast.example.com/ws?accessToken=tok123" ); } #[test] fn test_build_ws_url_http() { assert_eq!( build_ws_url("http://localhost:8080", "tok123"), "ws://localhost:8080/ws?accessToken=tok123" ); } #[test] fn test_build_ws_url_trailing_slash() { assert_eq!( build_ws_url("https://owncast.example.com/", "tok123"), "wss://owncast.example.com/ws?accessToken=tok123" ); } #[test] fn test_parse_ws_chat_message() { let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"viewer","isBot":false}}"#; let event = parse_ws_message(json); assert!(matches!( event, Some(BridgeEvent::ChatMessage { ref username, ref body, .. }) if username == "viewer" && body == "hello" )); } #[test] fn test_parse_ws_bot_message_ignored() { let json = r#"{"type":"CHAT","id":"abc","body":"hello","user":{"displayName":"bot","isBot":true}}"#; assert!(parse_ws_message(json).is_none()); } }