#!/usr/bin/env python3 from asif import Client import asyncio import re import aiohttp import json import time import logging import argparse import yaml from pathlib import Path from typing import List, Optional # Set up logging - only show important info logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger(__name__) class IcecastBot: def __init__(self, config_path: Optional[str] = None): # Load config self.config = self.load_config(config_path) # Initialize IRC bot with config self.bot = Client( host=self.config['irc']['host'], port=self.config['irc']['port'], user=self.config['irc']['user'], realname=self.config['irc']['realname'], nick=self.config['irc']['nick'] ) # Set up instance variables from config self.channel_name = self.config['irc']['channel'] self.stream_url = self.config['stream']['url'] self.stream_endpoint = self.config['stream']['endpoint'] self.current_song = "Unknown" self.reply = self.config['announce']['format'] self.ignore_patterns = self.config['announce']['ignore_patterns'] self.channel = None self.last_health_check = time.time() self.health_check_interval = self.config['stream']['health_check_interval'] self.setup_handlers() @staticmethod def load_config(config_path: Optional[str] = None) -> dict: """Load configuration from file and/or command line arguments.""" if config_path is None: config_path = Path(__file__).parent / 'config.yaml' # Load config file try: with open(config_path) as f: config = yaml.safe_load(f) except FileNotFoundError: logger.warning(f"Config file not found at {config_path}, using defaults") config = { 'irc': {}, 'stream': {}, 'announce': { 'format': "\x02Now playing:\x02 {song}", 'ignore_patterns': ['Unknown', 'Unable to fetch metadata'] } } return config def should_announce_song(self, song: str) -> bool: """Check if the song should be announced based on ignore patterns.""" return not any(pattern.lower() in song.lower() for pattern in self.ignore_patterns) def setup_handlers(self): @self.bot.on_connected() async def connected(): try: self.channel = await self.bot.join(self.channel_name) logger.info(f"Connected to IRC and joined {self.channel_name}") except Exception as e: logger.error(f"Error joining channel: {e}") asyncio.create_task(self.monitor_metadata()) @self.bot.on_join() async def on_join(channel): if not self.channel: self.channel = channel @self.bot.on_message(re.compile("^!np")) async def now_playing(message): await message.reply(self.reply.format(song=self.current_song)) async def fetch_json_metadata(self): try: async with aiohttp.ClientSession() as session: url = f"{self.stream_url}/status-json.xsl" async with session.get(url) as response: if response.status == 200: data = await response.text() json_data = json.loads(data) if 'icestats' in json_data: sources = json_data['icestats'].get('source', []) if isinstance(sources, list): for src in sources: if src['listenurl'].endswith(f'{self.stream_endpoint}'): source = src else: source = sources title = source.get('title') or source.get('song') or source.get('current_song') if title: return title return "Unable to fetch metadata" except Exception as e: logger.error(f"Error fetching JSON metadata: {e}") return "Error fetching metadata" async def monitor_metadata(self): await asyncio.sleep(5) while True: try: cmd = [ 'curl', '-s', '-H', 'Icy-MetaData: 1', '--no-buffer', f"{self.stream_url}/{self.stream_endpoint}" ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) logger.info("Started stream monitoring") buffer = b"" last_json_check = time.time() json_check_interval = 60 # Fallback interval if ICY updates fail while True: chunk = await process.stdout.read(8192) if not chunk: break buffer += chunk current_time = time.time() # Periodic health check if current_time - self.last_health_check >= self.health_check_interval: logger.info("Monitor status: Active - processing stream data") self.last_health_check = current_time # Look for metadata marker but fetch from JSON if b"StreamTitle='" in buffer: new_song = await self.fetch_json_metadata() if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song: logger.info(f"Now Playing: {new_song}") self.current_song = new_song await self.announce_song(new_song) # Clear buffer after metadata marker buffer = buffer[buffer.find(b"';", buffer.find(b"StreamTitle='")) + 2:] last_json_check = current_time # Keep buffer size reasonable if len(buffer) > 65536: buffer = buffer[-32768:] # Fallback JSON check if ICY updates aren't coming through if current_time - last_json_check >= json_check_interval: new_song = await self.fetch_json_metadata() if "Unable to fetch metadata" in new_song: break if new_song and new_song != self.current_song: logger.info(f"Now Playing (fallback): {new_song}") self.current_song = new_song await self.announce_song(new_song) last_json_check = current_time await asyncio.sleep(0.1) await process.wait() logger.warning("Stream monitor ended, restarting...") await asyncio.sleep(5) except Exception as e: logger.error(f"Stream monitor error: {e}") await asyncio.sleep(5) async def announce_song(self, song: str): """Announce song if it doesn't match any ignore patterns.""" try: if self.channel and self.should_announce_song(song): await self.channel.message(self.reply.format(song=song)) except Exception as e: logger.error(f"Error announcing song: {e}") async def start(self): await self.bot.run() async def run_multiple_bots(config_paths: List[str]): """Run multiple bot instances concurrently.""" bots = [IcecastBot(config_path) for config_path in config_paths] await asyncio.gather(*(bot.start() for bot in bots)) if __name__ == "__main__": parser = argparse.ArgumentParser(description='Icecast IRC Bot') parser.add_argument('configs', nargs='*', help='Paths to config files') parser.add_argument('--config', type=str, help='Path to single config file') parser.add_argument('--irc-host', type=str, help='IRC server host') parser.add_argument('--irc-port', type=int, help='IRC server port') parser.add_argument('--irc-nick', type=str, help='IRC nickname') parser.add_argument('--irc-channel', type=str, help='IRC channel') parser.add_argument('--stream-url', type=str, help='Icecast stream URL (base url; do not include /stream or .mp3)') parser.add_argument('--stream-endpoint', type=str, help='Stream endpoint (e.g. /stream)') args = parser.parse_args() if args.configs: # Multi-bot mode asyncio.run(run_multiple_bots(args.configs)) else: # Single-bot mode bot = IcecastBot(args.config) # Apply any command line overrides to the config if args.irc_host: bot.config['irc']['host'] = args.irc_host if args.irc_port: bot.config['irc']['port'] = args.irc_port if args.irc_nick: bot.config['irc']['nick'] = args.irc_nick if args.irc_channel: bot.config['irc']['channel'] = args.irc_channel if args.stream_url: bot.config['stream']['url'] = args.stream_url if args.stream_endpoint: bot.config['stream']['endpoint'] = args.stream_endpoint asyncio.run(bot.start())