2025-02-23 00:55:11 -08:00

250 lines
9.6 KiB
Python

#!/usr/bin/env python3
from asif import Client
import asyncio
import re
import aiohttp
import json
import time
import logging
import argparse
import yaml
from pathlib import Path
from typing import List, Optional
# Set up logging - only show important info
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
class IcecastBot:
def __init__(self, config_path: Optional[str] = None):
# Load config
self.config = self.load_config(config_path)
# Initialize IRC bot with config
self.bot = Client(
host=self.config['irc']['host'],
port=self.config['irc']['port'],
user=self.config['irc']['user'],
realname=self.config['irc']['realname'],
nick=self.config['irc']['nick']
)
# Set up instance variables from config
self.channel_name = self.config['irc']['channel']
self.stream_url = self.config['stream']['url']
self.stream_endpoint = self.config['stream']['endpoint']
self.current_song = "Unknown"
self.reply = self.config['announce']['format']
self.ignore_patterns = self.config['announce']['ignore_patterns']
self.channel = None
self.last_health_check = time.time()
self.health_check_interval = self.config['stream']['health_check_interval']
self.setup_handlers()
@staticmethod
def load_config(config_path: Optional[str] = None) -> dict:
"""Load configuration from file and/or command line arguments."""
if config_path is None:
config_path = Path(__file__).parent / 'config.yaml'
# Load config file
try:
with open(config_path) as f:
config = yaml.safe_load(f)
except FileNotFoundError:
logger.warning(f"Config file not found at {config_path}, using defaults")
config = {
'irc': {},
'stream': {},
'announce': {
'format': "\x02Now playing:\x02 {song}",
'ignore_patterns': ['Unknown', 'Unable to fetch metadata']
}
}
return config
def should_announce_song(self, song: str) -> bool:
"""Check if the song should be announced based on ignore patterns."""
return not any(pattern.lower() in song.lower() for pattern in self.ignore_patterns)
def setup_handlers(self):
@self.bot.on_connected()
async def connected():
try:
self.channel = await self.bot.join(self.channel_name)
logger.info(f"Connected to IRC and joined {self.channel_name}")
except Exception as e:
logger.error(f"Error joining channel: {e}")
asyncio.create_task(self.monitor_metadata())
@self.bot.on_join()
async def on_join(channel):
if not self.channel:
self.channel = channel
@self.bot.on_message(re.compile("^!np"))
async def now_playing(message):
await message.reply(self.reply.format(song=self.current_song))
async def fetch_json_metadata(self):
try:
async with aiohttp.ClientSession() as session:
url = f"{self.stream_url}/status-json.xsl"
async with session.get(url) as response:
if response.status == 200:
data = await response.text()
json_data = json.loads(data)
if 'icestats' in json_data:
sources = json_data['icestats'].get('source', [])
if isinstance(sources, list):
for src in sources:
if src['listenurl'].endswith(f'{self.stream_endpoint}'):
source = src
else:
source = sources
title = source.get('title') or source.get('song') or source.get('current_song')
if title:
return title
return "Unable to fetch metadata"
except Exception as e:
logger.error(f"Error fetching JSON metadata: {e}")
return "Error fetching metadata"
async def monitor_metadata(self):
await asyncio.sleep(5)
while True:
try:
cmd = [
'curl',
'-s',
'-H', 'Icy-MetaData: 1',
'--no-buffer',
f"{self.stream_url}/{self.stream_endpoint}"
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
logger.info("Started stream monitoring")
buffer = b""
last_json_check = time.time()
json_check_interval = 60 # Fallback interval if ICY updates fail
while True:
chunk = await process.stdout.read(8192)
if not chunk:
break
buffer += chunk
current_time = time.time()
# Periodic health check
if current_time - self.last_health_check >= self.health_check_interval:
logger.info("Monitor status: Active - processing stream data")
self.last_health_check = current_time
# Look for metadata marker but fetch from JSON
if b"StreamTitle='" in buffer:
new_song = await self.fetch_json_metadata()
if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song:
logger.info(f"Now Playing: {new_song}")
self.current_song = new_song
await self.announce_song(new_song)
# Clear buffer after metadata marker
buffer = buffer[buffer.find(b"';", buffer.find(b"StreamTitle='")) + 2:]
last_json_check = current_time
# Keep buffer size reasonable
if len(buffer) > 65536:
buffer = buffer[-32768:]
# Fallback JSON check if ICY updates aren't coming through
if current_time - last_json_check >= json_check_interval:
new_song = await self.fetch_json_metadata()
if "Unable to fetch metadata" in new_song:
break
if new_song and new_song != self.current_song:
logger.info(f"Now Playing (fallback): {new_song}")
self.current_song = new_song
await self.announce_song(new_song)
last_json_check = current_time
await asyncio.sleep(0.1)
await process.wait()
logger.warning("Stream monitor ended, restarting...")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Stream monitor error: {e}")
await asyncio.sleep(5)
async def announce_song(self, song: str):
"""Announce song if it doesn't match any ignore patterns."""
try:
if self.channel and self.should_announce_song(song):
await self.channel.message(self.reply.format(song=song))
except Exception as e:
logger.error(f"Error announcing song: {e}")
async def start(self):
await self.bot.run()
async def run_multiple_bots(config_paths: List[str]):
"""Run multiple bot instances concurrently."""
bots = [IcecastBot(config_path) for config_path in config_paths]
await asyncio.gather(*(bot.start() for bot in bots))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Icecast IRC Bot')
parser.add_argument('configs', nargs='*', help='Paths to config files')
parser.add_argument('--config', type=str, help='Path to single config file')
parser.add_argument('--irc-host', type=str, help='IRC server host')
parser.add_argument('--irc-port', type=int, help='IRC server port')
parser.add_argument('--irc-nick', type=str, help='IRC nickname')
parser.add_argument('--irc-channel', type=str, help='IRC channel')
parser.add_argument('--stream-url', type=str, help='Icecast stream URL (base url; do not include /stream or .mp3)')
parser.add_argument('--stream-endpoint', type=str, help='Stream endpoint (e.g. /stream)')
args = parser.parse_args()
if args.configs:
# Multi-bot mode
asyncio.run(run_multiple_bots(args.configs))
else:
# Single-bot mode
bot = IcecastBot(args.config)
# Apply any command line overrides to the config
if args.irc_host:
bot.config['irc']['host'] = args.irc_host
if args.irc_port:
bot.config['irc']['port'] = args.irc_port
if args.irc_nick:
bot.config['irc']['nick'] = args.irc_nick
if args.irc_channel:
bot.config['irc']['channel'] = args.irc_channel
if args.stream_url:
bot.config['stream']['url'] = args.stream_url
if args.stream_endpoint:
bot.config['stream']['endpoint'] = args.stream_endpoint
asyncio.run(bot.start())