#!/usr/bin/env python3 import re import json import os import sys import asyncio import aiohttp import time import argparse import yaml from pathlib import Path from typing import List, Optional import sys import inspect import socket import tempfile import signal # Import our custom logger from logger import log, debug, info, warning, error, critical, exception, get_logger # ANSI color codes for backward compatibility if needed class Colors: BLUE = "\033[94m" CYAN = "\033[96m" GREEN = "\033[92m" YELLOW = "\033[93m" MAGENTA = "\033[95m" RED = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" WHITE = "\033[97m" ORANGE = "\033[38;5;208m" from asif import Client # Patch asif's Client class def patch_client_bg(self, coro): """Patch for Client._bg to properly handle SystemExit""" async def runner(): try: await coro except SystemExit: raise # Re-raise SystemExit except Exception: pass return asyncio.ensure_future(runner()) # Patch asif's Client class def silent_client_init(self, *args, bot_name: str = None, config: dict = None, **kwargs): # Patch the _bg method self._bg = patch_client_bg.__get__(self) # Call the original __init__ original_init(self, *args, **kwargs) # Save original __init__ and replace it original_init = Client.__init__ Client.__init__ = silent_client_init import asyncio import aiohttp import time import argparse import yaml from pathlib import Path from typing import List, Optional import sys import inspect import socket import tempfile class RestartManager: """Manages restart requests for the bot. Uses a Unix domain socket to listen for restart commands. """ def __init__(self, bot_id: str): """Initialize the restart manager. Args: bot_id: Unique identifier for this bot instance """ # Get a logger for this class self.logger = get_logger("RestartManager") self.bot_id = bot_id self.socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock" self.server = None self.should_restart = False self.quiet_requested = False self.unquiet_requested = False async def start(self): """Start the restart manager server.""" # Clean up any existing socket if self.socket_path.exists(): self.socket_path.unlink() # Ensure the parent directory exists self.socket_path.parent.mkdir(parents=True, exist_ok=True) # Create the Unix Domain Socket server try: self.server = await asyncio.start_unix_server( self._handle_restart_request, str(self.socket_path) ) self.logger.info(f"Restart manager server started at {self.socket_path}") except Exception as e: self.logger.error(f"Error starting restart manager server: {e}") # Continue without the restart manager pass async def _handle_restart_request(self, reader, writer): """Handle an incoming restart request.""" try: data = await reader.read() if data == b'restart': self.logger.info("Received restart request") self.should_restart = True elif data == b'quiet': self.logger.info("Received quiet request") self.quiet_requested = True elif data == b'unquiet': self.logger.info("Received unquiet request") self.unquiet_requested = True writer.close() await writer.wait_closed() except Exception as e: self.logger.error(f"Error handling restart request: {e}") def cleanup(self): """Clean up the restart manager resources.""" if self.server: self.logger.debug("Closing restart manager server") self.server.close() if self.socket_path.exists(): self.logger.debug(f"Removing socket file: {self.socket_path}") try: self.socket_path.unlink() except Exception as e: self.logger.error(f"Error removing socket file: {e}") @staticmethod async def signal_restart(bot_id: str): """Signal a specific bot to restart. Args: bot_id: ID of the bot to restart """ # Get a logger for this static method logger = get_logger("RestartManager") socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock" logger.info(f"Sending restart signal to bot {bot_id} at {socket_path}") try: reader, writer = await asyncio.open_unix_connection(str(socket_path)) writer.write(b'restart') await writer.drain() writer.close() await writer.wait_closed() logger.info(f"Restart signal sent to bot {bot_id}") except Exception as e: logger.error(f"Error signaling restart to bot {bot_id}: {e}") class IcecastBot: """An IRC bot that monitors an Icecast stream and announces track changes. This bot connects to an IRC server and channel, monitors a specified Icecast stream for metadata changes, and announces new tracks. It supports various commands for both regular users and administrators. Features: - Configurable via YAML files - Multiple URL patterns for metadata fetching - Automatic reconnection and error recovery - Admin commands with permission system - Customizable help messages and announcements """ def get_version(): """Get the current version from VERSION file.""" with open('VERSION') as f: return f.read().strip() VERSION = get_version() def __init__(self, config_path: Optional[str] = None): """Initialize the bot with the given configuration. Args: config_path: Path to the YAML configuration file. If None, uses default path. """ # Get a logger for this class self.logger = get_logger("IcecastBot") self.logger.info(f"Initializing IcecastBot with config path: {config_path}") # Store config path for potential restarts self.config_path = config_path # Load config self.logger.info("Loading configuration...") self.config = self.load_config(config_path) self.logger.debug(f"Configuration loaded: {self.config}") # Create unique bot ID from nick and endpoint self.bot_id = f"{self.config['irc']['nick']}_{self.config['stream']['endpoint']}" self.logger.info(f"Bot ID: {self.bot_id}") # Initialize restart manager self.logger.info("Initializing restart manager...") self.restart_manager = RestartManager(self.bot_id) # Set up bot name bot_name = f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}' self.logger.info(f"Bot name: {bot_name}") # Initialize IRC bot with config and bot name self.logger.info("Creating IRC client...") try: self.bot = Client( host=self.config['irc']['host'], port=self.config['irc']['port'], user=self.config['irc']['user'], realname=self.config['irc']['realname'], nick=self.config['irc']['nick'], bot_name=bot_name, config=self.config ) self.logger.info("IRC client created successfully") except Exception as e: self.logger.error(f"Error creating IRC client: {e}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") raise # Set up instance variables self.channel_name = self.config['irc']['channel'] self.stream_url = self.config['stream']['url'] self.stream_endpoint = self.config['stream']['endpoint'] self.current_song = "Unknown" self.reply = self.config['announce']['format'] self.ignore_patterns = self.config['announce']['ignore_patterns'] self.channel = None self.last_health_check = time.time() self.health_check_interval = self.config['stream']['health_check_interval'] # Get command settings from config self.cmd_prefix = self.config.get('commands', {}).get('prefix', '!') self.require_nick_prefix = self.config.get('commands', {}).get('require_nick_prefix', False) self.allow_private_commands = self.config.get('commands', {}).get('allow_private_commands', False) # Get help format templates help_config = self.config.get('help', {}) self.help_specific_format = help_config.get('specific_format', "\x02{prefix}{cmd}\x02: {desc}") self.help_list_format = help_config.get('list_format', "(\x02{cmd}\x02, {desc})") self.help_list_separator = help_config.get('list_separator', " | ") # Control flags self.monitor_task = None self.should_monitor = True self.is_monitoring = False self.should_announce = not self.config.get('announce', {}).get('quiet_on_start', False) # Respect quiet_on_start config if not self.should_announce: self.logger.info("Starting in quiet mode (announcements disabled) due to configuration") self.admin_users = self.config.get('admin', {}).get('users', ['*']) self.current_process = None self.should_exit = False # Initialize command mappings self.pattern_to_method = {} self.method_to_pattern = {} self.admin_commands = set() self.command_handlers = {} # Set up handlers self.setup_handlers() # Build command mappings self._build_command_mappings() @staticmethod def load_config(config_path: Optional[str] = None) -> dict: """Load and validate the bot configuration from a YAML file. Args: config_path: Path to the YAML configuration file. If None, uses default path. Returns: dict: The loaded and validated configuration dictionary with default values applied. """ logger = get_logger("IcecastBot.config") logger.debug(f"Loading config from: {config_path}") if config_path is None: config_path = Path(__file__).parent / 'config.yaml' logger.info(f"No config path provided, using default: {config_path}") # Load config file try: logger.debug(f"Opening config file: {config_path}") with open(config_path) as f: config = yaml.safe_load(f) logger.debug(f"Config loaded successfully") except FileNotFoundError: logger.warning(f"Config file not found: {config_path}, using defaults") config = { 'irc': {}, 'stream': {}, 'announce': { 'format': "\x02Now playing:\x02 {song}", 'ignore_patterns': ["Unknown", "Unable to fetch metadata", "Error fetching metadata"] } } except Exception as e: logger.error(f"Error loading config: {e}") exception("Failed to load configuration") raise return config def should_announce_song(self, song: str) -> bool: """Check if a song should be announced based on configured ignore patterns. Args: song: The song title to check. Returns: bool: True if the song should be announced, False if it matches any ignore patterns. """ try: if not song: self.logger.debug("Empty song title, not announcing") return False if not self.ignore_patterns: self.logger.debug("No ignore patterns configured, announcing all songs") return True # Check each pattern for pattern in self.ignore_patterns: try: if not pattern: continue if not isinstance(pattern, str): self.logger.warning(f"Invalid ignore pattern (not a string): {pattern}") continue if pattern.lower() in song.lower(): self.logger.debug(f"Song '{song}' matched ignore pattern '{pattern}', not announcing") return False except Exception as e: self.logger.error(f"Error checking ignore pattern '{pattern}': {str(e)}") continue self.logger.debug(f"Song '{song}' passed all ignore patterns, will announce") return True except Exception as e: self.logger.error(f"Exception in should_announce_song: {str(e)}") exception("Failed to check if song should be announced") # Default to not announcing if there's an error return False def setup_handlers(self): """Set up all IRC event handlers and command patterns. This method configures: - Connection and join handlers - Command pattern creation - Message debugging - Command handlers (np, help, admin commands) """ @self.bot.on_connected() async def connected(): try: self.channel = await self.bot.join(self.channel_name) except Exception as e: pass if self.should_monitor: await self.start_monitoring() @self.bot.on_join() async def on_join(channel): # Store the channel if not self.channel: self.channel = channel def create_command_pattern(cmd: str) -> re.Pattern: """Create a regex pattern for matching IRC commands. The pattern handles optional nick prefixes (if configured) and command prefixes. Args: cmd: The command name without prefix. Returns: re.Pattern: Compiled regex pattern for matching the command. """ if self.require_nick_prefix: pattern = f"^({self.config['irc']['nick']}[:,] )?{re.escape(self.cmd_prefix)}{cmd}($| .*$)" else: pattern = f"^{re.escape(self.cmd_prefix)}{cmd}($| .*$)" return re.compile(pattern) # Global message handler for debugging @self.bot.on_message() async def debug_message(message): """Handler for all received messages. Args: message: The IRC message object. """ try: # Test each command pattern msg = getattr(message, 'text', None) if msg and isinstance(msg, str): for cmd in ['np', 'help', 'restart', 'quit', 'reconnect', 'stop', 'start']: pattern = create_command_pattern(cmd) if pattern.match(msg): pass except Exception: pass # Command handlers @self.bot.on_message(create_command_pattern('np')) async def now_playing(message): """!np: Show the currently playing song Displays the current song title from the Icecast stream. The song title is fetched from the stream's metadata or JSON status. Args: message: IRC message object """ recipient = getattr(message, 'recipient', None) # Check if recipient is a Channel object is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return try: if is_channel: await recipient.message(self.reply.format(song=self.current_song)) except Exception as e: pass self.command_handlers['now_playing'] = now_playing @self.bot.on_message(create_command_pattern('help')) async def help_command(message): """!help: Show available commands Display all available commands or detailed help for a specific command. Usage: !help [command] Args: message: IRC message object """ recipient = getattr(message, 'recipient', None) # Check if recipient is a Channel object is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return try: # Parse message to check if a specific command was requested msg_text = getattr(message, 'text', '') parts = msg_text.strip().split() if len(parts) > 1: # Specific command help requested pattern = parts[1].lower() if pattern.startswith(self.cmd_prefix): pattern = pattern[len(self.cmd_prefix):] # Remove prefix if included # Find the command handler using our mapping method_name = self.pattern_to_method.get(pattern) if method_name: handler = self.command_handlers.get(method_name) if handler and handler.__doc__: # Get the first line of the docstring first_line = handler.__doc__.strip().split('\n')[0] # Format it using the template and add (admin only) if needed desc = first_line.split(':', 1)[1].strip() if pattern in self.admin_commands: desc = f"{desc} (admin only)" help_text = self.help_specific_format.format( prefix=self.cmd_prefix, cmd=pattern, desc=desc ) # Check if user has permission for this command if pattern in self.admin_commands and not self.is_admin(message.sender): help_text = "You don't have permission to use this command." else: help_text = f"No help available for command: {pattern}" else: help_text = f"Unknown command: {pattern}" else: # Build command list with proper formatting formatted_groups = [] # Add general commands first general_commands = [] for pattern, method_name in self.pattern_to_method.items(): if pattern not in self.admin_commands: # If not an admin command handler = self.command_handlers.get(method_name) if handler and handler.__doc__: first_line = handler.__doc__.strip().split('\n')[0] desc = first_line.split(':', 1)[1].strip() general_commands.append( self.help_list_format.format( cmd=f"{self.cmd_prefix}{pattern}", desc=desc ) ) if general_commands: formatted_groups.append(self.help_list_separator.join(general_commands)) # Add admin commands if user is admin if self.is_admin(message.sender): admin_commands = [] for pattern in sorted(self.admin_commands): method_name = self.pattern_to_method.get(pattern) if method_name: handler = self.command_handlers.get(method_name) if handler and handler.__doc__: first_line = handler.__doc__.strip().split('\n')[0] desc = first_line.split(':', 1)[1].strip() # Don't add (admin only) in the list view admin_commands.append( self.help_list_format.format( cmd=f"{self.cmd_prefix}{pattern}", desc=desc ) ) if admin_commands: formatted_groups.append("Admin: " + self.help_list_separator.join(admin_commands)) help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | " + " | ".join(formatted_groups) if is_channel: await recipient.message(help_text) except Exception as e: pass self.command_handlers['help_command'] = help_command @self.bot.on_message(create_command_pattern('restart')) @self.admin_required async def restart_bot(message): """!restart: Restart the bot (admin only) Gracefully shuts down the bot and signals the bot.sh script to restart it. This ensures a clean restart. Args: message: IRC message object """ await self.stop_monitoring() try: await self.bot.quit("Restarting...") # Signal restart through Unix Domain Socket await RestartManager.signal_restart(self.bot_id) self.should_exit = True except Exception as e: pass self.command_handlers['restart_bot'] = restart_bot @self.bot.on_message(create_command_pattern('quit')) @self.admin_required async def quit_bot(message): """!quit: Shutdown the bot (admin only) Gracefully shuts down the bot and exits without restarting. Args: message: IRC message object """ await self.stop_monitoring() try: await self.bot.quit("Shutting down...") self.should_exit = True except Exception as e: pass self.command_handlers['quit_bot'] = quit_bot @self.bot.on_message(create_command_pattern('reconnect')) @self.admin_required async def reconnect_stream(message): """!reconnect: Reconnect to the stream (admin only) Attempts to reconnect to the stream and verifies the connection. Reports success or failure back to the channel. Args: message: IRC message object """ success = await self.restart_monitoring() if not success and hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Stream reconnection may have failed. Check logs for details.") self.command_handlers['reconnect_stream'] = reconnect_stream @self.bot.on_message(create_command_pattern('stop')) @self.admin_required async def stop_monitoring(message): """!stop: Stop stream monitoring (admin only) Stops monitoring the stream for metadata changes. The bot remains connected to IRC. Args: message: IRC message object """ await self.stop_monitoring() if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Stream monitoring stopped.") self.command_handlers['stop_monitoring'] = stop_monitoring @self.bot.on_message(create_command_pattern('start')) @self.admin_required async def start_monitoring(message): """!start: Start stream monitoring (admin only) Starts monitoring the stream for metadata changes. Will announce new songs in the channel. Args: message: IRC message object """ await self.start_monitoring() if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Stream monitoring started.") self.command_handlers['start_monitoring'] = start_monitoring @self.bot.on_message(create_command_pattern('quiet')) @self.admin_required async def quiet_bot(message): """!quiet: Disable song announcements (admin only) Continues monitoring the stream for metadata changes, but stops announcing songs in the channel. Args: message: IRC message object """ self.should_announce = False self.logger.info("Song announcements disabled by admin command") if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Song announcements disabled. Bot will continue monitoring but remain quiet.") self.command_handlers['quiet_bot'] = quiet_bot @self.bot.on_message(create_command_pattern('unquiet')) @self.admin_required async def unquiet_bot(message): """!unquiet: Enable song announcements (admin only) Resumes announcing songs in the channel. The bot must already be monitoring the stream. Args: message: IRC message object """ self.should_announce = True self.logger.info("Song announcements enabled by admin command") if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Song announcements enabled. Bot will now announce songs.") self.command_handlers['unquiet_bot'] = unquiet_bot def _build_command_mappings(self): """Build bidirectional mappings between command patterns and method names. Creates two mappings: - pattern_to_method: Maps IRC command pattern (e.g. 'np') to method name - method_to_pattern: Maps method name to IRC command pattern Command patterns are extracted from the first line of each handler's docstring, which must be in the format "!command: description". """ for method_name, handler in self.command_handlers.items(): if not handler.__doc__: continue # Check if this is a command handler by looking at the docstring if handler.__doc__.strip().startswith('!'): # Extract command pattern from docstring first_line = handler.__doc__.strip().split('\n')[0] pattern = first_line.split(':', 1)[0].strip('!') # Store both mappings self.pattern_to_method[pattern] = method_name self.method_to_pattern[method_name] = pattern # Check if this is an admin command by looking at the decorator if hasattr(handler, '__closure__') and handler.__closure__: for cell in handler.__closure__: if isinstance(cell.cell_contents, type(self.admin_required)): self.admin_commands.add(pattern) break def is_admin(self, user: str) -> bool: """Check if a user has admin privileges. Args: user: Full IRC user string (nickname!username@hostname) or User object. Returns: bool: True if user has admin privileges, False otherwise. """ try: if hasattr(user, 'name'): nickname = user.name else: nickname = user.split('!')[0] if '!' in user else user except Exception as e: return False return '*' in self.admin_users or nickname in self.admin_users async def start_monitoring(self): """Start the metadata monitoring task. Creates an asyncio task to monitor the stream for metadata changes. Only starts if not already monitoring. """ if not self.is_monitoring: self.should_monitor = True self.monitor_task = asyncio.create_task(self.monitor_metadata()) self.is_monitoring = True async def stop_monitoring(self): """Stop the metadata monitoring task. Terminates the curl subprocess if running and cancels the monitoring task. Ensures proper cleanup of resources. """ self.should_monitor = False self.is_monitoring = False # First, terminate the subprocess if it exists if self.current_process and self.current_process.returncode is None: try: self.current_process.terminate() try: await asyncio.wait_for(self.current_process.wait(), timeout=2.0) except asyncio.TimeoutError: self.current_process.kill() await self.current_process.wait() except Exception as e: pass finally: self.current_process = None # Then cancel the monitoring task if self.monitor_task: self.monitor_task.cancel() try: await self.monitor_task except asyncio.CancelledError: pass except Exception as e: pass finally: self.monitor_task = None async def restart_monitoring(self): """Restart the metadata monitoring task and verify the reconnection. Stops the current monitoring task, starts a new one, and verifies that metadata can be fetched successfully. Returns: bool: True if reconnection was successful and verified, False otherwise. """ await self.stop_monitoring() # Store the channel for status updates notify_channel = self.channel try: await self.start_monitoring() # Wait for up to 10 seconds to confirm data is being received start_time = time.time() while time.time() - start_time < 10: if self.current_process and self.current_process.returncode is None: # Try to fetch metadata to confirm connection test_song = await self.fetch_json_metadata() if test_song and "Unable to fetch metadata" not in test_song: if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message("Stream reconnected successfully.") return True await asyncio.sleep(1) # If we get here, the connection wasn't confirmed if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message("Stream reconnection attempt completed, but status uncertain. Check logs for details.") return False except Exception as e: if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message(f"Failed to reconnect to stream: {str(e)}") return False async def fetch_json_metadata(self): """Fetch metadata from the Icecast JSON status endpoint. Tries multiple URL patterns to find the correct status endpoint. Handles connection errors and JSON parsing gracefully. Returns: str: The current song title, or an error message if fetching failed. """ try: self.logger.info(f"Fetching metadata from {self.stream_url}/{self.stream_endpoint}") # Try different URL patterns base_urls = [ self.stream_url, # Original URL self.stream_url.replace('live.', 'listen.'), # Try listen. instead of live. '/'.join(self.stream_url.split('/')[:-1]) if '/' in self.stream_url else self.stream_url # Try parent path ] for base_url in base_urls: try: url = f"{base_url}/status-json.xsl" self.logger.debug(f"Trying URL: {url}") async with aiohttp.ClientSession() as session: async with session.get(url, timeout=10) as response: if response.status == 200: data = await response.text() self.logger.debug(f"Received response from {url}") try: json_data = json.loads(data) if 'icestats' in json_data: sources = json_data['icestats'].get('source', []) if not isinstance(sources, list): sources = [sources] # Find our stream for src in sources: if src.get('listenurl', '').endswith(self.stream_endpoint): title = src.get('title') or src.get('song') or src.get('current_song') if title: self.logger.debug(f"Found title: {title}") return title except json.JSONDecodeError as e: self.logger.warning(f"JSON decode error for {url}: {str(e)}") continue except aiohttp.ClientError as e: self.logger.warning(f"Client error for {url}: {str(e)}") continue except json.JSONDecodeError as e: self.logger.warning(f"JSON decode error for {url}: {str(e)}") continue except asyncio.TimeoutError: self.logger.warning(f"Timeout fetching metadata from {url}") continue except Exception as e: self.logger.error(f"Unexpected error fetching from {url}: {str(e)}") continue self.logger.warning("All URL patterns failed, returning 'Unable to fetch metadata'") return "Unable to fetch metadata" except Exception as e: self.logger.error(f"Exception in fetch_json_metadata: {str(e)}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") return f"Error fetching metadata: {str(e)}" async def monitor_metadata(self): """Monitor the Icecast stream for metadata changes. Uses curl to read the stream and detect metadata changes. Handles stream errors, reconnection, and health checks. Announces new songs when detected. """ await asyncio.sleep(5) while self.should_monitor: try: cmd = [ 'curl', '-s', '-H', 'Icy-MetaData: 1', '--no-buffer', f"{self.stream_url}/{self.stream_endpoint}" ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) self.current_process = process # Store the process reference self.logger.debug(f"Started curl process to monitor stream: {self.stream_url}/{self.stream_endpoint}") last_data_received = time.time() buffer = b"" last_json_check = time.time() json_check_interval = 60 # Fallback interval if ICY updates fail data_timeout = 180 # 3 minutes without data is considered a failure empty_chunks_count = 0 max_empty_chunks = 5 # After 5 empty chunks in a row, reconnect while True: try: chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0) if chunk: last_data_received = time.time() buffer += chunk empty_chunks_count = 0 # Reset counter on successful data else: empty_chunks_count += 1 if empty_chunks_count >= max_empty_chunks: self.logger.warning(f"Received {max_empty_chunks} empty chunks in a row, reconnecting") break if time.time() - last_data_received > data_timeout: self.logger.warning(f"Data timeout exceeded ({data_timeout}s), reconnecting") break current_time = time.time() # Periodic health check if current_time - self.last_health_check >= self.health_check_interval: self.logger.debug("Performing periodic health check") self.last_health_check = current_time # Look for metadata marker but fetch from JSON if b"StreamTitle='" in buffer: try: self.logger.debug("Detected StreamTitle marker in buffer, fetching metadata") new_song = await self.fetch_json_metadata() # Check if we should announce the song if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song: self.logger.info(f"Song changed from '{self.current_song}' to '{new_song}'") self.current_song = new_song # Try to announce the song try: await self.announce_song(new_song) except Exception as e: self.logger.error(f"Error announcing song: {str(e)}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") else: # No song change or unable to fetch metadata self.logger.debug(f"No song change detected or unable to fetch metadata: {new_song}") # Clear buffer after metadata marker marker_pos = buffer.find(b"StreamTitle='") end_pos = buffer.find(b"';", marker_pos) if end_pos > marker_pos: buffer = buffer[end_pos + 2:] self.logger.debug("Buffer cleared after metadata marker") else: self.logger.warning(f"Could not find end of metadata marker, truncating buffer") buffer = buffer[-8192:] # Keep last 8KB to avoid losing the end marker # Update last check time last_json_check = current_time except Exception as e: self.logger.error(f"Error processing metadata marker: {str(e)}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") # Reset buffer to avoid getting stuck in a loop buffer = b"" # Keep buffer size reasonable if len(buffer) > 65536: buffer = buffer[-32768:] self.logger.debug("Buffer size exceeded limit, truncated to 32KB") # Fallback JSON check if ICY updates aren't coming through if current_time - last_json_check >= json_check_interval: try: self.logger.debug("Performing fallback JSON check") new_song = await self.fetch_json_metadata() if "Unable to fetch metadata" in new_song: self.logger.warning("Unable to fetch metadata in fallback check") if time.time() - last_data_received > data_timeout: self.logger.warning("Data timeout exceeded, breaking loop") break elif new_song and new_song != self.current_song: self.logger.info(f"Song changed in fallback check from '{self.current_song}' to '{new_song}'") self.current_song = new_song try: await self.announce_song(new_song) except Exception as e: self.logger.error(f"Error announcing song in fallback check: {str(e)}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") else: self.logger.debug(f"No song change detected in fallback check: {new_song}") last_json_check = current_time except Exception as e: self.logger.error(f"Error in fallback JSON check: {str(e)}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") # Still update the check time to avoid rapid retries last_json_check = current_time await asyncio.sleep(0.1) except asyncio.TimeoutError: self.logger.warning("Timeout reading from stream") if time.time() - last_data_received > data_timeout: self.logger.warning(f"Data timeout exceeded ({data_timeout}s), reconnecting") break continue except Exception as e: self.logger.error(f"Error reading from stream: {str(e)}") break # Check if process is still running and terminate if needed if process.returncode is None: try: self.logger.debug("Terminating curl process") process.terminate() await asyncio.wait_for(process.wait(), timeout=5.0) except asyncio.TimeoutError: self.logger.warning("Timeout waiting for curl process to terminate") except Exception as e: self.logger.error(f"Error terminating curl process: {str(e)}") finally: self.current_process = None self.logger.info("Reconnecting to stream after short delay") await asyncio.sleep(5) except Exception as e: self.logger.error(f"Error in monitor_metadata: {str(e)}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") await asyncio.sleep(5) async def announce_song(self, song: str): """Announce a song in the IRC channel. Args: song: The song title to announce. Only announces if: - The channel object is valid - The song doesn't match any ignore patterns - The announcement format is configured - The should_announce flag is True """ try: self.logger.info(f"Attempting to announce song: {song}") if not self.should_announce: self.logger.info(f"Song announcements are disabled, not announcing: {song}") return if self.channel and self.should_announce_song(song): self.logger.debug(f"Song passed filters, preparing to announce") # Use the stored channel object directly if hasattr(self.channel, 'name') and self.channel.name.startswith('#'): try: formatted_message = self.reply.format(song=song) self.logger.debug(f"Sending message to channel {self.channel.name}: {formatted_message}") await self.channel.message(self.reply.format(song=song)) self.logger.info(f"Successfully announced song: {song}") except Exception as e: self.logger.error(f"Error sending message to channel: {str(e)}") exception("Failed to send message to channel") else: self.logger.warning(f"Channel object invalid or not a channel: {self.channel}") else: if not self.channel: self.logger.warning("Channel object is None or invalid") elif not self.should_announce_song(song): self.logger.debug(f"Song '{song}' matched ignore patterns, not announcing") except Exception as e: self.logger.error(f"Exception in announce_song: {str(e)}") exception("Failed to announce song") async def start(self): """Start the IRC bot and begin processing events.""" self.logger.info("Starting IcecastBot...") try: # Create a state file for the manager to detect state_file = Path(tempfile.gettempdir()) / 'icecast-irc-bots.json' try: # Load existing state if it exists state = {} if state_file.exists(): try: with open(state_file, 'r') as f: state = json.load(f) except json.JSONDecodeError: # File exists but is not valid JSON state = {} # Add this bot to the state state[self.bot_id] = { 'pid': os.getpid(), 'config': str(self.config_path) } # Save the state with open(state_file, 'w') as f: json.dump(state, f) self.logger.info(f"Created state file at {state_file}") # Register a signal handler to remove this bot from the state file on exit def cleanup_state_file(signum, frame): try: if state_file.exists(): with open(state_file, 'r') as f: current_state = json.load(f) if self.bot_id in current_state: del current_state[self.bot_id] with open(state_file, 'w') as f: json.dump(current_state, f) self.logger.info(f"Removed {self.bot_id} from state file") except Exception as e: self.logger.error(f"Error cleaning up state file: {e}") sys.exit(0) # Register signal handlers signal.signal(signal.SIGTERM, cleanup_state_file) signal.signal(signal.SIGINT, cleanup_state_file) except Exception as e: self.logger.error(f"Error creating state file: {e}") # Start the restart manager self.logger.info("Starting restart manager...") await self.restart_manager.start() self.logger.info("Restart manager started") # Start the bot self.logger.info("Starting IRC bot...") await self.bot.run() self.logger.info("IRC bot started") except Exception as e: self.logger.error(f"Error starting bot: {e}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") finally: self.logger.info("In start() finally block") if self.should_exit: self.logger.info("Bot should exit, cleaning up...") # Clean up any remaining tasks if self.monitor_task: self.logger.info("Canceling monitor task...") self.monitor_task.cancel() try: await self.monitor_task except asyncio.CancelledError: pass # Clean up restart manager self.logger.info("Cleaning up restart manager...") self.restart_manager.cleanup() self.logger.info("Restart manager cleaned up") def format_help_section(self, section_config: dict, prefix: str) -> List[str]: """Format a help section according to the template. Extracts the first line of each command's docstring to use as the help text. Falls back to the template's command descriptions if docstring is not available. Args: section_config: Configuration dictionary for the section. prefix: Command prefix to use. Returns: List[str]: List of formatted help lines for each command. """ commands = [] for cmd, desc in section_config['commands'].items(): # Try to get the docstring from the command handler handler = None for name, method in inspect.getmembers(self, predicate=inspect.ismethod): if name.endswith(f"_{cmd}"): # e.g. now_playing for np handler = method break if handler and handler.__doc__: # Extract the first line of the docstring first_line = handler.__doc__.strip().split('\n')[0] # Remove the command prefix and colon desc = first_line.split(':', 1)[1].strip() commands.append(section_config['format'].format( cmd=f"\x02{prefix}{cmd}\x02", # Bold the command desc=desc )) return commands async def help_command_fallback(self, message): """Fallback help command implementation using hardcoded format. Used when the help template is not configured or fails to process. Args: message: The IRC message object that triggered this command. """ try: prefix = self.cmd_prefix # Format commands with bold prefix and aligned descriptions base_cmds = f"\x02{prefix}np\x02 (current song) • \x02{prefix}help\x02 (this help)" help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | Commands: {base_cmds}" if self.is_admin(message.sender): admin_cmds = ( f"\x02{prefix}start\x02/\x02stop\x02 (monitoring) • " f"\x02{prefix}reconnect\x02 (stream) • " f"\x02{prefix}restart\x02 (bot) • " f"\x02{prefix}quit\x02 (shutdown)" ) help_text += f" | Admin: {admin_cmds}" if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message(help_text) except Exception as e: pass def admin_required(self, f): """Decorator to mark a command as requiring admin privileges. Also automatically adds the command to the admin_commands set for help message grouping. Args: f: The command handler function to wrap. Returns: The wrapped function that checks for admin privileges. """ async def wrapped(message, *args, **kwargs): recipient = getattr(message, 'recipient', None) is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return if not self.is_admin(message.sender): if is_channel: await recipient.message("You don't have permission to use this command.") return return await f(message, *args, **kwargs) # Copy the docstring and other attributes wrapped.__doc__ = f.__doc__ wrapped.__name__ = f.__name__ # Add the command pattern to admin_commands set if f.__doc__ and f.__doc__.strip().startswith('!'): pattern = f.__doc__.strip().split(':', 1)[0].strip('!') self.admin_commands.add(pattern) return wrapped async def run_multiple_bots(config_paths: List[str]): """Run multiple bot instances concurrently. Each bot runs independently and can be stopped without affecting others. """ bots = [] tasks = [] for config_path in config_paths: try: bot = IcecastBot(config_path) bots.append(bot) # Create task for each bot tasks.append(asyncio.create_task(run_single_bot(bot))) except Exception as e: pass if not bots: pass return # Wait for all bots to complete await asyncio.gather(*tasks) async def run_single_bot(bot: IcecastBot): """Run a single bot instance. Args: bot: The IcecastBot instance to run """ bot.logger.info(f"Running single bot with ID: {bot.bot_id}") try: bot.logger.info("Starting bot...") # Start a background task to check for quiet/unquiet requests async def check_quiet_unquiet(): while True: try: # Check for quiet request if bot.restart_manager.quiet_requested: bot.logger.info("Processing quiet request") bot.should_announce = False bot.restart_manager.quiet_requested = False if bot.channel and hasattr(bot.channel, 'name') and bot.channel.name.startswith('#'): await bot.channel.message("Song announcements disabled via terminal command.") # Check for unquiet request if bot.restart_manager.unquiet_requested: bot.logger.info("Processing unquiet request") bot.should_announce = True bot.restart_manager.unquiet_requested = False if bot.channel and hasattr(bot.channel, 'name') and bot.channel.name.startswith('#'): await bot.channel.message("Song announcements enabled via terminal command.") except Exception as e: bot.logger.error(f"Error in quiet/unquiet check: {e}") await asyncio.sleep(1) # Check every second # Start the background task quiet_check_task = asyncio.create_task(check_quiet_unquiet()) # Start the bot await bot.start() bot.logger.info("Bot start completed") # Cancel the quiet check task quiet_check_task.cancel() try: await quiet_check_task except asyncio.CancelledError: pass # Check if we should restart this bot if bot.restart_manager.should_restart: bot.logger.info("Bot should restart") # Clean up bot.logger.info("Cleaning up restart manager...") bot.restart_manager.cleanup() # Create and start a new instance bot.logger.info("Creating new bot instance...") new_bot = IcecastBot(bot.config_path) bot.logger.info("Starting new bot instance...") await run_single_bot(new_bot) # If should_exit is True but should_restart is False, just exit cleanly elif bot.should_exit: bot.logger.info("Bot should exit cleanly") bot.restart_manager.cleanup() except Exception as e: bot.logger.error(f"Error in run_single_bot: {e}") import traceback bot.logger.error(f"Traceback: {traceback.format_exc()}") finally: bot.logger.info("In run_single_bot finally block") # Ensure cleanup happens if bot.monitor_task: bot.logger.info("Canceling monitor task...") bot.monitor_task.cancel() try: await bot.monitor_task bot.logger.info("Monitor task canceled") except asyncio.CancelledError: bot.logger.info("Monitor task canceled with CancelledError") pass if __name__ == "__main__": # Initialize the logger logger = get_logger("main") logger.info("Starting Icecast IRC Bot...") parser = argparse.ArgumentParser(description='Icecast IRC Bot') parser.add_argument('configs', nargs='*', help='Paths to config files') parser.add_argument('--config', type=str, help='Path to single config file') parser.add_argument('--irc-host', type=str, help='IRC server host') parser.add_argument('--irc-port', type=int, help='IRC server port') parser.add_argument('--irc-nick', type=str, help='IRC nickname') parser.add_argument('--irc-channel', type=str, help='IRC channel') parser.add_argument('--stream-url', type=str, help='Icecast stream URL (base url; do not include /stream or .mp3)') parser.add_argument('--stream-endpoint', type=str, help='Stream endpoint (e.g. /stream)') parser.add_argument('--cmd-prefix', type=str, help='Command prefix character(s)') args = parser.parse_args() logger.info(f"Arguments parsed: {args}") # Set up the event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) logger.info("Event loop created") async def run_bot(): try: logger.info("Starting bot...") if args.configs: # Multi-bot mode logger.info(f"Running in multi-bot mode with configs: {args.configs}") await run_multiple_bots(args.configs) else: # Single-bot mode logger.info(f"Running in single-bot mode with config: {args.config}") bot = IcecastBot(args.config) logger.info("Bot instance created") # Apply any command line overrides to the config if args.irc_host: bot.config['irc']['host'] = args.irc_host if args.irc_port: bot.config['irc']['port'] = args.irc_port if args.irc_nick: bot.config['irc']['nick'] = args.irc_nick if args.irc_channel: bot.config['irc']['channel'] = args.irc_channel if args.stream_url: bot.config['stream']['url'] = args.stream_url if args.stream_endpoint: bot.config['stream']['endpoint'] = args.stream_endpoint if args.cmd_prefix: if 'commands' not in bot.config: bot.config['commands'] = {} bot.config['commands']['prefix'] = args.cmd_prefix logger.info("Starting single bot...") await run_single_bot(bot) except Exception as e: logger.error(f"Error in run_bot: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") sys.exit(1) try: # Run the bot logger.info("Running event loop...") loop.run_until_complete(run_bot()) except Exception as e: logger.error(f"Error in main: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") finally: try: logger.info("Cleaning up...") # Cancel any remaining tasks for task in asyncio.all_tasks(loop): task.cancel() # Give tasks a chance to cancel loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)) # Finally close the loop loop.close() logger.info("Cleanup complete") except Exception as e: logger.error(f"Error during cleanup: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}")