#!/usr/bin/env python3 # Set up logging configuration before imports import logging import os class AsifFilter(logging.Filter): def filter(self, record): # Block messages from asif module or if they start with "Joined channel" return not (record.module == 'asif' or record.name.startswith('asif') or (isinstance(record.msg, str) and record.msg.startswith('Joined channel'))) # Defer logging setup until after config is loaded def setup_logging(log_level: str = 'INFO'): """Set up logging with the specified level.""" # Convert string level to logging constant numeric_level = getattr(logging, log_level.upper(), logging.INFO) # Set up base logging configuration logging.basicConfig( level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) # Apply filter to root logger logging.getLogger().addFilter(AsifFilter()) # Configure asif's logger asif_logger = logging.getLogger('asif') asif_logger.addFilter(AsifFilter()) asif_logger.setLevel(logging.CRITICAL) asif_logger.propagate = False # Set up error logging for asif error_handler = logging.FileHandler('ERROR.log') error_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) from asif import Client # Patch asif's Client class def patch_client_bg(self, coro): """Patch for Client._bg to properly handle SystemExit""" async def runner(): try: await coro except SystemExit: raise # Re-raise SystemExit except Exception: self._log.exception("async: Coroutine raised exception") return asyncio.ensure_future(runner()) # Patch asif's Client class to use error logger and patched _bg def silent_client_init(self, *args, **kwargs): # Create a logger that writes to ERROR.log error_logger = logging.getLogger('asif.client') error_logger.addHandler(error_handler) error_logger.propagate = False # Don't send to console error_logger.setLevel(logging.INFO) # Capture all messages # Store the logger self._log = error_logger # Patch the _bg method self._bg = patch_client_bg.__get__(self) # Call the original __init__ original_init(self, *args, **kwargs) # Save original __init__ and replace it original_init = Client.__init__ Client.__init__ = silent_client_init import asyncio import re import aiohttp import json import time import argparse import yaml from pathlib import Path from typing import List, Optional import sys # ANSI color codes class Colors: COLORS = [ '\033[94m', # BLUE '\033[96m', # CYAN '\033[92m', # GREEN '\033[93m', # YELLOW '\033[95m', # MAGENTA '\033[91m', # RED ] ENDC = '\033[0m' BOLD = '\033[1m' # Track used colors used_colors = {} # botname -> color mapping @classmethod def get_color_for_bot(cls, botname: str) -> str: """Get a consistent color for a bot based on its name.""" # If this bot already has a color, return it if botname in cls.used_colors: return cls.used_colors[botname] # If we still have unused colors, use the next available one unused_colors = [c for c in cls.COLORS if c not in cls.used_colors.values()] if unused_colors: color = unused_colors[0] else: # If we're out of unique colors, fall back to hash-based selection color = cls.COLORS[hash(botname) % len(cls.COLORS)] cls.used_colors[botname] = color return color class BotLoggerAdapter(logging.LoggerAdapter): # Class variables to track maximum lengths max_nick_length = 0 max_endpoint_length = 0 instances = [] # Keep track of all instances to update padding def __init__(self, logger, extra): super().__init__(logger, extra) botname = extra['botname'] nick, endpoint = botname.split('@') self.nick = nick self.endpoint = endpoint # Update max lengths (without ANSI codes) old_max_nick = BotLoggerAdapter.max_nick_length old_max_endpoint = BotLoggerAdapter.max_endpoint_length BotLoggerAdapter.max_nick_length = max( BotLoggerAdapter.max_nick_length, len(nick) ) BotLoggerAdapter.max_endpoint_length = max( BotLoggerAdapter.max_endpoint_length, len(endpoint) ) # If max lengths changed, update all existing instances if (old_max_nick != BotLoggerAdapter.max_nick_length or old_max_endpoint != BotLoggerAdapter.max_endpoint_length): for instance in BotLoggerAdapter.instances: instance.update_padding() # Add self to instances list BotLoggerAdapter.instances.append(self) # Initial padding calculation self.update_padding() def update_padding(self): """Update the colored botname with current padding requirements.""" # Right-align nick, then @ symbol, then colored endpoint nick_padding = " " * (BotLoggerAdapter.max_nick_length - len(self.nick)) endpoint_padding = " " * (BotLoggerAdapter.max_endpoint_length - len(self.endpoint)) self.colored_botname = f"{nick_padding}{self.nick}@{Colors.BOLD}{Colors.get_color_for_bot(self.nick+'@'+self.endpoint)}{self.endpoint}{Colors.ENDC}{endpoint_padding}" def process(self, msg, kwargs): return f'[{self.colored_botname}] {msg}', kwargs # Add flag file constant at the top level RESTART_FLAG_FILE = ".restart_flag" class IcecastBot: def __init__(self, config_path: Optional[str] = None): # Load config self.config = self.load_config(config_path) # Set up bot-specific logger self.logger = BotLoggerAdapter( logging.getLogger(__name__), {'botname': f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}'} ) # Initialize IRC bot with config self.bot = Client( host=self.config['irc']['host'], port=self.config['irc']['port'], user=self.config['irc']['user'], realname=self.config['irc']['realname'], nick=self.config['irc']['nick'] ) # Set up instance variables from config self.channel_name = self.config['irc']['channel'] self.stream_url = self.config['stream']['url'] self.stream_endpoint = self.config['stream']['endpoint'] self.current_song = "Unknown" self.reply = self.config['announce']['format'] self.ignore_patterns = self.config['announce']['ignore_patterns'] self.channel = None self.last_health_check = time.time() self.health_check_interval = self.config['stream']['health_check_interval'] # Get command settings from config self.cmd_prefix = self.config.get('commands', {}).get('prefix', '!') self.require_nick_prefix = self.config.get('commands', {}).get('require_nick_prefix', False) self.allow_private_commands = self.config.get('commands', {}).get('allow_private_commands', False) # Control flags self.monitor_task = None self.should_monitor = True self.is_monitoring = False self.admin_users = self.config.get('admin', {}).get('users', ['*']) # '*' means anyone can use admin commands self.current_process = None # Track the current subprocess # Create a unique restart flag file for this bot instance self.restart_flag_file = f".restart_flag_{self.config['irc']['nick']}_{self.stream_endpoint}" self.setup_handlers() @staticmethod def load_config(config_path: Optional[str] = None) -> dict: """Load configuration from file and/or command line arguments.""" if config_path is None: config_path = Path(__file__).parent / 'config.yaml' # Load config file try: with open(config_path) as f: config = yaml.safe_load(f) except FileNotFoundError: # Create a temporary logger for config loading temp_logger = logging.getLogger(__name__) temp_logger.warning(f"Config file not found at {config_path}, using defaults") config = { 'irc': {}, 'stream': {}, 'announce': { 'format': "\x02Now playing:\x02 {song}", 'ignore_patterns': ['Unknown', 'Unable to fetch metadata'] }, 'logging': { 'level': 'INFO' } } # Set up logging with configured level setup_logging(config.get('logging', {}).get('level', 'INFO')) return config def should_announce_song(self, song: str) -> bool: """Check if the song should be announced based on ignore patterns.""" return not any(pattern.lower() in song.lower() for pattern in self.ignore_patterns) def setup_handlers(self): @self.bot.on_connected() async def connected(): try: self.channel = await self.bot.join(self.channel_name) self.logger.info(f"Connected to IRC and joined {self.channel_name}") except Exception as e: self.logger.error(f"Error joining channel: {e}") if self.should_monitor: await self.start_monitoring() @self.bot.on_join() async def on_join(channel): # Silently store the channel without logging if not self.channel: self.channel = channel def create_command_pattern(cmd: str) -> re.Pattern: """Create a regex pattern for a command that handles nick prefixes.""" if self.require_nick_prefix: pattern = f"^({self.config['irc']['nick']}[:,] )?{re.escape(self.cmd_prefix)}{cmd}($| .*$)" else: pattern = f"^{re.escape(self.cmd_prefix)}{cmd}($| .*$)" self.logger.debug(f"Created command pattern for '{cmd}': {pattern}") return re.compile(pattern) # Global message handler for debugging @self.bot.on_message() async def debug_message(message): """Debug handler to log all messages""" try: # Log full message details only at DEBUG level self.logger.debug( f"Received message: " f"recipient={getattr(message, 'recipient', None)!r}, " f"sender={getattr(message, 'sender', None)!r}, " f"text={getattr(message, 'text', None)!r}" ) # Test each command pattern msg = getattr(message, 'text', None) if msg and isinstance(msg, str): for cmd in ['np', 'help', 'restart', 'quit', 'reconnect', 'stop', 'start']: pattern = create_command_pattern(cmd) if pattern.match(msg): self.logger.debug(f"Command matched: {cmd}") except Exception as e: self.logger.error(f"Error in debug message handler: {e}") # Command handlers with additional debug logging @self.bot.on_message(create_command_pattern('np')) async def now_playing(message): self.logger.debug(f"np handler called with message: {getattr(message, 'text', None)!r}") recipient = getattr(message, 'recipient', None) # Check if recipient is a Channel object is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') self.logger.debug(f"Recipient check - is_channel: {is_channel}, recipient: {recipient}") if not self.allow_private_commands and not is_channel: self.logger.debug("Ignoring private np command") return try: if is_channel: await recipient.message(self.reply.format(song=self.current_song)) self.logger.debug("Successfully sent np reply") else: self.logger.debug("Could not send np reply - invalid recipient") except Exception as e: self.logger.error(f"Error in np handler: {e}", exc_info=True) @self.bot.on_message(create_command_pattern('help')) async def help_command(message): self.logger.debug(f"help handler called with message: {getattr(message, 'text', None)!r}") recipient = getattr(message, 'recipient', None) # Check if recipient is a Channel object is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') self.logger.debug(f"Recipient check - is_channel: {is_channel}, recipient: {recipient}") if not self.allow_private_commands and not is_channel: self.logger.debug("Ignoring private help command") return try: prefix = self.cmd_prefix help_text = ( f"Available commands:\n" f"{prefix}np - Show current song\n" f"{prefix}help - Show this help message\n" ) if self.is_admin(message.sender): help_text += ( f"{prefix}start - Start stream monitoring\n" f"{prefix}stop - Stop stream monitoring\n" f"{prefix}reconnect - Reconnect to stream\n" f"{prefix}restart - Restart the bot\n" f"{prefix}quit - Shutdown the bot\n" ) if is_channel: await recipient.message(help_text) self.logger.debug("Successfully sent help reply") else: self.logger.debug("Could not send help reply - invalid recipient") except Exception as e: self.logger.error(f"Error in help handler: {e}", exc_info=True) @self.bot.on_message(create_command_pattern('restart')) async def restart_bot(message): recipient = getattr(message, 'recipient', None) is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return if not self.is_admin(message.sender): if is_channel: await recipient.message("You don't have permission to use this command.") return self.logger.info(f"Restart command received from {message.sender}") await self.stop_monitoring() try: await self.bot.quit("Restarting...") with open(self.restart_flag_file, 'w') as f: f.write('restart') except Exception as e: self.logger.error(f"Error during restart: {e}") sys.exit(0) @self.bot.on_message(create_command_pattern('quit')) async def quit_bot(message): recipient = getattr(message, 'recipient', None) is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return if not self.is_admin(message.sender): if is_channel: await recipient.message("You don't have permission to use this command.") return self.logger.info(f"Quit command received from {message.sender}") await self.stop_monitoring() try: await self.bot.quit("Shutting down...") except Exception as e: self.logger.error(f"Error during shutdown: {e}") sys.exit(0) @self.bot.on_message(create_command_pattern('reconnect')) async def reconnect_stream(message): recipient = getattr(message, 'recipient', None) is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return if not self.is_admin(message.sender): if is_channel: await recipient.message("You don't have permission to use this command.") return self.logger.info(f"Reconnect command received from {message.sender}") success = await self.restart_monitoring() if not success and is_channel: await recipient.message("Stream reconnection may have failed. Check logs for details.") @self.bot.on_message(create_command_pattern('stop')) async def stop_monitoring(message): recipient = getattr(message, 'recipient', None) is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return if not self.is_admin(message.sender): if is_channel: await recipient.message("You don't have permission to use this command.") return self.logger.info(f"Stop command received from {message.sender}") await self.stop_monitoring() if is_channel: await recipient.message("Stream monitoring stopped.") @self.bot.on_message(create_command_pattern('start')) async def start_monitoring(message): recipient = getattr(message, 'recipient', None) is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return if not self.is_admin(message.sender): if is_channel: await recipient.message("You don't have permission to use this command.") return self.logger.info(f"Start command received from {message.sender}") await self.start_monitoring() if is_channel: await recipient.message("Stream monitoring started.") def is_admin(self, user: str) -> bool: """Check if a user has admin privileges. Args: user: Full IRC user string (nickname!username@hostname) or User object Returns: bool: True if user has admin privileges """ try: if hasattr(user, 'name'): nickname = user.name else: nickname = user.split('!')[0] if '!' in user else user except Exception as e: self.logger.error(f"Error extracting nickname: {e}") return False return '*' in self.admin_users or nickname in self.admin_users async def start_monitoring(self): """Start the metadata monitoring task.""" if not self.is_monitoring: self.should_monitor = True self.monitor_task = asyncio.create_task(self.monitor_metadata()) self.is_monitoring = True self.logger.info("Started metadata monitoring task") async def stop_monitoring(self): """Stop the metadata monitoring task.""" self.should_monitor = False self.is_monitoring = False # First, terminate the subprocess if it exists if self.current_process and self.current_process.returncode is None: try: self.current_process.terminate() try: await asyncio.wait_for(self.current_process.wait(), timeout=2.0) except asyncio.TimeoutError: self.current_process.kill() await self.current_process.wait() except Exception as e: self.logger.error(f"Error terminating subprocess: {e}") finally: self.current_process = None # Then cancel the monitoring task if self.monitor_task: self.monitor_task.cancel() try: await self.monitor_task except asyncio.CancelledError: pass except Exception as e: self.logger.error(f"Error cancelling monitor task: {e}") finally: self.monitor_task = None self.logger.info("Stopped metadata monitoring task") async def restart_monitoring(self): """Restart the metadata monitoring task and verify the reconnection.""" await self.stop_monitoring() # Store the channel for status updates notify_channel = self.channel try: await self.start_monitoring() # Wait for up to 10 seconds to confirm data is being received start_time = time.time() while time.time() - start_time < 10: if self.current_process and self.current_process.returncode is None: # Try to fetch metadata to confirm connection test_song = await self.fetch_json_metadata() if test_song and "Unable to fetch metadata" not in test_song: if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message("Stream reconnected successfully.") return True await asyncio.sleep(1) # If we get here, the connection wasn't confirmed if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message("Stream reconnection attempt completed, but status uncertain. Check logs for details.") return False except Exception as e: self.logger.error(f"Error during stream reconnection: {e}") if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message(f"Failed to reconnect to stream: {str(e)}") return False async def fetch_json_metadata(self): """Fetch metadata from the Icecast JSON status endpoint.""" try: # Try different URL patterns base_urls = [ self.stream_url, # Original URL self.stream_url.replace('live.', 'listen.'), # Try listen. instead of live. '/'.join(self.stream_url.split('/')[:-1]) if '/' in self.stream_url else self.stream_url # Try parent path ] for base_url in base_urls: try: url = f"{base_url}/status-json.xsl" self.logger.debug(f"Attempting to fetch metadata from: {url}") async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.text() json_data = json.loads(data) if 'icestats' in json_data: sources = json_data['icestats'].get('source', []) if not isinstance(sources, list): sources = [sources] # Find our stream for src in sources: if src.get('listenurl', '').endswith(self.stream_endpoint): title = src.get('title') or src.get('song') or src.get('current_song') if title: return title except aiohttp.ClientError as e: self.logger.debug(f"Failed to fetch from {url}: {e}") continue except json.JSONDecodeError as e: self.logger.debug(f"Failed to parse JSON from {url}: {e}") continue self.logger.warning("Unable to fetch metadata from any URL") return "Unable to fetch metadata" except Exception as e: self.logger.error(f"Error fetching JSON metadata: {e}", exc_info=True) return "Error fetching metadata" async def monitor_metadata(self): await asyncio.sleep(5) while self.should_monitor: try: cmd = [ 'curl', '-s', '-H', 'Icy-MetaData: 1', '--no-buffer', f"{self.stream_url}/{self.stream_endpoint}" ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) self.current_process = process # Store the process reference self.logger.info("Started stream monitoring") last_data_received = time.time() buffer = b"" last_json_check = time.time() json_check_interval = 60 # Fallback interval if ICY updates fail data_timeout = 180 # 3 minutes without data is considered a failure empty_chunks_count = 0 max_empty_chunks = 5 # After 5 empty chunks in a row, reconnect while True: try: chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0) if chunk: last_data_received = time.time() buffer += chunk empty_chunks_count = 0 # Reset counter on successful data else: empty_chunks_count += 1 self.logger.warning(f"Received empty chunk from stream ({empty_chunks_count}/{max_empty_chunks})") if empty_chunks_count >= max_empty_chunks: self.logger.error("Too many empty chunks in a row, restarting connection") break if time.time() - last_data_received > data_timeout: self.logger.error(f"No data received for {data_timeout} seconds, restarting connection") break current_time = time.time() # Periodic health check if current_time - self.last_health_check >= self.health_check_interval: self.logger.info( f"Monitor status: Active - Last data received {int(current_time - last_data_received)}s ago" ) self.last_health_check = current_time # Look for metadata marker but fetch from JSON if b"StreamTitle='" in buffer: new_song = await self.fetch_json_metadata() if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song: self.logger.info(f"Now Playing: {new_song}") self.current_song = new_song await self.announce_song(new_song) # Clear buffer after metadata marker buffer = buffer[buffer.find(b"';", buffer.find(b"StreamTitle='")) + 2:] last_json_check = current_time # Keep buffer size reasonable if len(buffer) > 65536: buffer = buffer[-32768:] # Fallback JSON check if ICY updates aren't coming through if current_time - last_json_check >= json_check_interval: new_song = await self.fetch_json_metadata() if "Unable to fetch metadata" in new_song: if time.time() - last_data_received > data_timeout: break if new_song and new_song != self.current_song: self.logger.info(f"Now Playing (fallback): {new_song}") self.current_song = new_song await self.announce_song(new_song) last_json_check = current_time await asyncio.sleep(0.1) except asyncio.TimeoutError: self.logger.warning("Timeout while reading stream data") if time.time() - last_data_received > data_timeout: self.logger.error("Stream read timeout exceeded limit, restarting connection") break continue except Exception as e: self.logger.error(f"Error in stream processing loop: {e}") break # Check if process is still running and terminate if needed if process.returncode is None: try: process.terminate() await asyncio.wait_for(process.wait(), timeout=5.0) except asyncio.TimeoutError: self.logger.warning("Process did not terminate gracefully, killing it") process.kill() await process.wait() except Exception as e: self.logger.error(f"Error terminating process: {e}") finally: self.current_process = None self.logger.warning("Stream monitor ended, restarting in 5 seconds...") await asyncio.sleep(5) except Exception as e: self.logger.error(f"Stream monitor error: {e}") await asyncio.sleep(5) async def announce_song(self, song: str): """Announce song if it doesn't match any ignore patterns.""" try: if self.channel and self.should_announce_song(song): # Use the stored channel object directly if hasattr(self.channel, 'name') and self.channel.name.startswith('#'): await self.channel.message(self.reply.format(song=song)) self.logger.debug(f"Successfully announced song: {song}") else: self.logger.error(f"Could not announce song - invalid channel object: {self.channel}") except Exception as e: self.logger.error(f"Error announcing song: {e}", exc_info=True) async def start(self): await self.bot.run() async def run_multiple_bots(config_paths: List[str]): """Run multiple bot instances concurrently.""" bots = [] for config_path in config_paths: try: bot = IcecastBot(config_path) bots.append(bot) except Exception as e: logging.error(f"Failed to initialize bot with config {config_path}: {e}") if not bots: logging.error("No bots were successfully initialized") return await asyncio.gather(*(bot.start() for bot in bots)) if __name__ == "__main__": parser = argparse.ArgumentParser(description='Icecast IRC Bot') parser.add_argument('configs', nargs='*', help='Paths to config files') parser.add_argument('--config', type=str, help='Path to single config file') parser.add_argument('--irc-host', type=str, help='IRC server host') parser.add_argument('--irc-port', type=int, help='IRC server port') parser.add_argument('--irc-nick', type=str, help='IRC nickname') parser.add_argument('--irc-channel', type=str, help='IRC channel') parser.add_argument('--stream-url', type=str, help='Icecast stream URL (base url; do not include /stream or .mp3)') parser.add_argument('--stream-endpoint', type=str, help='Stream endpoint (e.g. /stream)') parser.add_argument('--cmd-prefix', type=str, help='Command prefix character(s)') args = parser.parse_args() # Check for restart flag files at startup try: for flag_file in Path('.').glob('.restart_flag_*'): try: flag_file.unlink() except Exception as e: logging.error(f"Error removing restart flag file {flag_file}: {e}") except Exception as e: logging.error(f"Error handling restart flag files: {e}") # Set up the event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def run_bot(): try: if args.configs: # Multi-bot mode await run_multiple_bots(args.configs) else: # Single-bot mode bot = IcecastBot(args.config) # Apply any command line overrides to the config if args.irc_host: bot.config['irc']['host'] = args.irc_host if args.irc_port: bot.config['irc']['port'] = args.irc_port if args.irc_nick: bot.config['irc']['nick'] = args.irc_nick if args.irc_channel: bot.config['irc']['channel'] = args.irc_channel if args.stream_url: bot.config['stream']['url'] = args.stream_url if args.stream_endpoint: bot.config['stream']['endpoint'] = args.stream_endpoint if args.cmd_prefix: if 'commands' not in bot.config: bot.config['commands'] = {} bot.config['commands']['prefix'] = args.cmd_prefix await bot.start() except Exception as e: logging.error(f"Unhandled exception: {e}") sys.exit(1) try: # Run the bot loop.run_until_complete(run_bot()) finally: try: # Cancel any remaining tasks for task in asyncio.all_tasks(loop): task.cancel() # Give tasks a chance to cancel loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)) # Finally close the loop loop.close() except Exception as e: logging.error(f"Error during cleanup: {e}") sys.exit(1)