#!/usr/bin/env python3 import re import json import os import sys import asyncio import aiohttp import time import argparse import yaml from pathlib import Path from typing import List, Optional import sys import inspect import socket import tempfile import signal # ANSI color codes for backward compatibility if needed class Colors: BLUE = "\033[94m" CYAN = "\033[96m" GREEN = "\033[92m" YELLOW = "\033[93m" MAGENTA = "\033[95m" RED = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" WHITE = "\033[97m" ORANGE = "\033[38;5;208m" from asif import Client # Patch asif's Client class def patch_client_bg(self, coro): """Patch for Client._bg to properly handle SystemExit""" async def runner(): try: await coro except SystemExit: raise # Re-raise SystemExit except Exception: pass return asyncio.ensure_future(runner()) # Patch asif's Client class def silent_client_init(self, *args, bot_name: str = None, config: dict = None, **kwargs): # Patch the _bg method self._bg = patch_client_bg.__get__(self) # Call the original __init__ original_init(self, *args, **kwargs) # Save original __init__ and replace it original_init = Client.__init__ Client.__init__ = silent_client_init import asyncio import aiohttp import time import argparse import yaml from pathlib import Path from typing import List, Optional import sys import inspect import socket import tempfile class RestartManager: """Manages bot restarts using Unix Domain Sockets. This class provides a clean way to handle bot restarts without using flag files. Each bot instance gets its own Unix Domain Socket. """ def __init__(self, bot_id: str): """Initialize the restart manager. Args: bot_id: Unique identifier for this bot instance """ self.bot_id = bot_id self.socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock" self.server = None self.should_restart = False async def start(self): """Start the restart manager server.""" # Clean up any existing socket if self.socket_path.exists(): self.socket_path.unlink() # Ensure the parent directory exists self.socket_path.parent.mkdir(parents=True, exist_ok=True) # Create the Unix Domain Socket server try: self.server = await asyncio.start_unix_server( self._handle_restart_request, str(self.socket_path) ) print(f"Restart manager server started at {self.socket_path}") except Exception as e: print(f"Error starting restart manager server: {e}") # Continue without the restart manager pass async def _handle_restart_request(self, reader, writer): """Handle an incoming restart request.""" try: data = await reader.read() if data == b'restart': self.should_restart = True writer.close() await writer.wait_closed() except Exception as e: pass def cleanup(self): """Clean up the socket file.""" try: if self.server: self.server.close() if self.socket_path.exists(): self.socket_path.unlink() except Exception as e: pass @staticmethod async def signal_restart(bot_id: str): """Signal a specific bot to restart. Args: bot_id: ID of the bot to restart """ socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock" try: reader, writer = await asyncio.open_unix_connection(str(socket_path)) writer.write(b'restart') await writer.drain() writer.close() await writer.wait_closed() except Exception as e: pass class IcecastBot: """An IRC bot that monitors an Icecast stream and announces track changes. This bot connects to an IRC server and channel, monitors a specified Icecast stream for metadata changes, and announces new tracks. It supports various commands for both regular users and administrators. Features: - Configurable via YAML files - Multiple URL patterns for metadata fetching - Automatic reconnection and error recovery - Admin commands with permission system - Customizable help messages and announcements """ def get_version(): """Get the current version from VERSION file.""" with open('VERSION') as f: return f.read().strip() VERSION = get_version() def __init__(self, config_path: Optional[str] = None): """Initialize the bot with the given configuration. Args: config_path: Path to the YAML configuration file. If None, uses default path. """ print(f"Initializing IcecastBot with config path: {config_path}") # Store config path for potential restarts self.config_path = config_path # Load config print("Loading configuration...") self.config = self.load_config(config_path) print(f"Configuration loaded: {self.config}") # Create unique bot ID from nick and endpoint self.bot_id = f"{self.config['irc']['nick']}_{self.config['stream']['endpoint']}" print(f"Bot ID: {self.bot_id}") # Initialize restart manager print("Initializing restart manager...") self.restart_manager = RestartManager(self.bot_id) # Set up bot name bot_name = f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}' print(f"Bot name: {bot_name}") # Initialize IRC bot with config and bot name print("Creating IRC client...") try: self.bot = Client( host=self.config['irc']['host'], port=self.config['irc']['port'], user=self.config['irc']['user'], realname=self.config['irc']['realname'], nick=self.config['irc']['nick'], bot_name=bot_name, config=self.config ) print("IRC client created successfully") except Exception as e: print(f"Error creating IRC client: {e}") import traceback traceback.print_exc() raise # Set up instance variables self.channel_name = self.config['irc']['channel'] self.stream_url = self.config['stream']['url'] self.stream_endpoint = self.config['stream']['endpoint'] self.current_song = "Unknown" self.reply = self.config['announce']['format'] self.ignore_patterns = self.config['announce']['ignore_patterns'] self.channel = None self.last_health_check = time.time() self.health_check_interval = self.config['stream']['health_check_interval'] # Get command settings from config self.cmd_prefix = self.config.get('commands', {}).get('prefix', '!') self.require_nick_prefix = self.config.get('commands', {}).get('require_nick_prefix', False) self.allow_private_commands = self.config.get('commands', {}).get('allow_private_commands', False) # Get help format templates help_config = self.config.get('help', {}) self.help_specific_format = help_config.get('specific_format', "\x02{prefix}{cmd}\x02: {desc}") self.help_list_format = help_config.get('list_format', "(\x02{cmd}\x02, {desc})") self.help_list_separator = help_config.get('list_separator', " | ") # Control flags self.monitor_task = None self.should_monitor = True self.is_monitoring = False self.admin_users = self.config.get('admin', {}).get('users', ['*']) self.current_process = None self.should_exit = False # Initialize command mappings self.pattern_to_method = {} self.method_to_pattern = {} self.admin_commands = set() self.command_handlers = {} # Set up handlers self.setup_handlers() # Build command mappings self._build_command_mappings() @staticmethod def load_config(config_path: Optional[str] = None) -> dict: """Load and validate the bot configuration from a YAML file. Args: config_path: Path to the YAML configuration file. If None, uses default path. Returns: dict: The loaded and validated configuration dictionary with default values applied. """ print(f"Loading config from: {config_path}") if config_path is None: config_path = Path(__file__).parent / 'config.yaml' print(f"No config path provided, using default: {config_path}") # Load config file try: print(f"Opening config file: {config_path}") with open(config_path) as f: config = yaml.safe_load(f) print(f"Config loaded successfully: {config}") except FileNotFoundError: print(f"Config file not found: {config_path}, using defaults") config = { 'irc': {}, 'stream': {}, 'announce': { 'format': "\x02Now playing:\x02 {song}", 'ignore_patterns': ["Unknown", "Unable to fetch metadata", "Error fetching metadata"] } } except Exception as e: print(f"Error loading config: {e}") import traceback traceback.print_exc() raise return config def should_announce_song(self, song: str) -> bool: """Check if a song should be announced based on configured ignore patterns. Args: song: The song title to check. Returns: bool: True if the song should be announced, False if it matches any ignore patterns. """ return not any(pattern.lower() in song.lower() for pattern in self.ignore_patterns) def setup_handlers(self): """Set up all IRC event handlers and command patterns. This method configures: - Connection and join handlers - Command pattern creation - Message debugging - Command handlers (np, help, admin commands) """ @self.bot.on_connected() async def connected(): try: self.channel = await self.bot.join(self.channel_name) except Exception as e: pass if self.should_monitor: await self.start_monitoring() @self.bot.on_join() async def on_join(channel): # Store the channel if not self.channel: self.channel = channel def create_command_pattern(cmd: str) -> re.Pattern: """Create a regex pattern for matching IRC commands. The pattern handles optional nick prefixes (if configured) and command prefixes. Args: cmd: The command name without prefix. Returns: re.Pattern: Compiled regex pattern for matching the command. """ if self.require_nick_prefix: pattern = f"^({self.config['irc']['nick']}[:,] )?{re.escape(self.cmd_prefix)}{cmd}($| .*$)" else: pattern = f"^{re.escape(self.cmd_prefix)}{cmd}($| .*$)" return re.compile(pattern) # Global message handler for debugging @self.bot.on_message() async def debug_message(message): """Handler for all received messages. Args: message: The IRC message object. """ try: # Test each command pattern msg = getattr(message, 'text', None) if msg and isinstance(msg, str): for cmd in ['np', 'help', 'restart', 'quit', 'reconnect', 'stop', 'start']: pattern = create_command_pattern(cmd) if pattern.match(msg): pass except Exception: pass # Command handlers @self.bot.on_message(create_command_pattern('np')) async def now_playing(message): """!np: Show the currently playing song Displays the current song title from the Icecast stream. The song title is fetched from the stream's metadata or JSON status. Args: message: IRC message object """ recipient = getattr(message, 'recipient', None) # Check if recipient is a Channel object is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return try: if is_channel: await recipient.message(self.reply.format(song=self.current_song)) except Exception as e: pass self.command_handlers['now_playing'] = now_playing @self.bot.on_message(create_command_pattern('help')) async def help_command(message): """!help: Show available commands Display all available commands or detailed help for a specific command. Usage: !help [command] Args: message: IRC message object """ recipient = getattr(message, 'recipient', None) # Check if recipient is a Channel object is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return try: # Parse message to check if a specific command was requested msg_text = getattr(message, 'text', '') parts = msg_text.strip().split() if len(parts) > 1: # Specific command help requested pattern = parts[1].lower() if pattern.startswith(self.cmd_prefix): pattern = pattern[len(self.cmd_prefix):] # Remove prefix if included # Find the command handler using our mapping method_name = self.pattern_to_method.get(pattern) if method_name: handler = self.command_handlers.get(method_name) if handler and handler.__doc__: # Get the first line of the docstring first_line = handler.__doc__.strip().split('\n')[0] # Format it using the template and add (admin only) if needed desc = first_line.split(':', 1)[1].strip() if pattern in self.admin_commands: desc = f"{desc} (admin only)" help_text = self.help_specific_format.format( prefix=self.cmd_prefix, cmd=pattern, desc=desc ) # Check if user has permission for this command if pattern in self.admin_commands and not self.is_admin(message.sender): help_text = "You don't have permission to use this command." else: help_text = f"No help available for command: {pattern}" else: help_text = f"Unknown command: {pattern}" else: # Build command list with proper formatting formatted_groups = [] # Add general commands first general_commands = [] for pattern, method_name in self.pattern_to_method.items(): if pattern not in self.admin_commands: # If not an admin command handler = self.command_handlers.get(method_name) if handler and handler.__doc__: first_line = handler.__doc__.strip().split('\n')[0] desc = first_line.split(':', 1)[1].strip() general_commands.append( self.help_list_format.format( cmd=f"{self.cmd_prefix}{pattern}", desc=desc ) ) if general_commands: formatted_groups.append(self.help_list_separator.join(general_commands)) # Add admin commands if user is admin if self.is_admin(message.sender): admin_commands = [] for pattern in sorted(self.admin_commands): method_name = self.pattern_to_method.get(pattern) if method_name: handler = self.command_handlers.get(method_name) if handler and handler.__doc__: first_line = handler.__doc__.strip().split('\n')[0] desc = first_line.split(':', 1)[1].strip() # Don't add (admin only) in the list view admin_commands.append( self.help_list_format.format( cmd=f"{self.cmd_prefix}{pattern}", desc=desc ) ) if admin_commands: formatted_groups.append("Admin: " + self.help_list_separator.join(admin_commands)) help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | " + " | ".join(formatted_groups) if is_channel: await recipient.message(help_text) except Exception as e: pass self.command_handlers['help_command'] = help_command @self.bot.on_message(create_command_pattern('restart')) @self.admin_required async def restart_bot(message): """!restart: Restart the bot (admin only) Gracefully shuts down the bot and signals the bot.sh script to restart it. This ensures a clean restart. Args: message: IRC message object """ await self.stop_monitoring() try: await self.bot.quit("Restarting...") # Signal restart through Unix Domain Socket await RestartManager.signal_restart(self.bot_id) self.should_exit = True except Exception as e: pass self.command_handlers['restart_bot'] = restart_bot @self.bot.on_message(create_command_pattern('quit')) @self.admin_required async def quit_bot(message): """!quit: Shutdown the bot (admin only) Gracefully shuts down the bot and exits without restarting. Args: message: IRC message object """ await self.stop_monitoring() try: await self.bot.quit("Shutting down...") self.should_exit = True except Exception as e: pass self.command_handlers['quit_bot'] = quit_bot @self.bot.on_message(create_command_pattern('reconnect')) @self.admin_required async def reconnect_stream(message): """!reconnect: Reconnect to the stream (admin only) Attempts to reconnect to the stream and verifies the connection. Reports success or failure back to the channel. Args: message: IRC message object """ success = await self.restart_monitoring() if not success and hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Stream reconnection may have failed. Check logs for details.") self.command_handlers['reconnect_stream'] = reconnect_stream @self.bot.on_message(create_command_pattern('stop')) @self.admin_required async def stop_monitoring(message): """!stop: Stop stream monitoring (admin only) Stops monitoring the stream for metadata changes. The bot remains connected to IRC. Args: message: IRC message object """ await self.stop_monitoring() if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Stream monitoring stopped.") self.command_handlers['stop_monitoring'] = stop_monitoring @self.bot.on_message(create_command_pattern('start')) @self.admin_required async def start_monitoring(message): """!start: Start stream monitoring (admin only) Starts monitoring the stream for metadata changes. Will announce new songs in the channel. Args: message: IRC message object """ await self.start_monitoring() if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message("Stream monitoring started.") self.command_handlers['start_monitoring'] = start_monitoring def _build_command_mappings(self): """Build bidirectional mappings between command patterns and method names. Creates two mappings: - pattern_to_method: Maps IRC command pattern (e.g. 'np') to method name - method_to_pattern: Maps method name to IRC command pattern Command patterns are extracted from the first line of each handler's docstring, which must be in the format "!command: description". """ for method_name, handler in self.command_handlers.items(): if not handler.__doc__: continue # Check if this is a command handler by looking at the docstring if handler.__doc__.strip().startswith('!'): # Extract command pattern from docstring first_line = handler.__doc__.strip().split('\n')[0] pattern = first_line.split(':', 1)[0].strip('!') # Store both mappings self.pattern_to_method[pattern] = method_name self.method_to_pattern[method_name] = pattern # Check if this is an admin command by looking at the decorator if hasattr(handler, '__closure__') and handler.__closure__: for cell in handler.__closure__: if isinstance(cell.cell_contents, type(self.admin_required)): self.admin_commands.add(pattern) break def is_admin(self, user: str) -> bool: """Check if a user has admin privileges. Args: user: Full IRC user string (nickname!username@hostname) or User object. Returns: bool: True if user has admin privileges, False otherwise. """ try: if hasattr(user, 'name'): nickname = user.name else: nickname = user.split('!')[0] if '!' in user else user except Exception as e: return False return '*' in self.admin_users or nickname in self.admin_users async def start_monitoring(self): """Start the metadata monitoring task. Creates an asyncio task to monitor the stream for metadata changes. Only starts if not already monitoring. """ if not self.is_monitoring: self.should_monitor = True self.monitor_task = asyncio.create_task(self.monitor_metadata()) self.is_monitoring = True async def stop_monitoring(self): """Stop the metadata monitoring task. Terminates the curl subprocess if running and cancels the monitoring task. Ensures proper cleanup of resources. """ self.should_monitor = False self.is_monitoring = False # First, terminate the subprocess if it exists if self.current_process and self.current_process.returncode is None: try: self.current_process.terminate() try: await asyncio.wait_for(self.current_process.wait(), timeout=2.0) except asyncio.TimeoutError: self.current_process.kill() await self.current_process.wait() except Exception as e: pass finally: self.current_process = None # Then cancel the monitoring task if self.monitor_task: self.monitor_task.cancel() try: await self.monitor_task except asyncio.CancelledError: pass except Exception as e: pass finally: self.monitor_task = None async def restart_monitoring(self): """Restart the metadata monitoring task and verify the reconnection. Stops the current monitoring task, starts a new one, and verifies that metadata can be fetched successfully. Returns: bool: True if reconnection was successful and verified, False otherwise. """ await self.stop_monitoring() # Store the channel for status updates notify_channel = self.channel try: await self.start_monitoring() # Wait for up to 10 seconds to confirm data is being received start_time = time.time() while time.time() - start_time < 10: if self.current_process and self.current_process.returncode is None: # Try to fetch metadata to confirm connection test_song = await self.fetch_json_metadata() if test_song and "Unable to fetch metadata" not in test_song: if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message("Stream reconnected successfully.") return True await asyncio.sleep(1) # If we get here, the connection wasn't confirmed if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message("Stream reconnection attempt completed, but status uncertain. Check logs for details.") return False except Exception as e: if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'): await notify_channel.message(f"Failed to reconnect to stream: {str(e)}") return False async def fetch_json_metadata(self): """Fetch metadata from the Icecast JSON status endpoint. Tries multiple URL patterns to find the correct status endpoint. Handles connection errors and JSON parsing gracefully. Returns: str: The current song title, or an error message if fetching failed. """ try: # Try different URL patterns base_urls = [ self.stream_url, # Original URL self.stream_url.replace('live.', 'listen.'), # Try listen. instead of live. '/'.join(self.stream_url.split('/')[:-1]) if '/' in self.stream_url else self.stream_url # Try parent path ] for base_url in base_urls: try: url = f"{base_url}/status-json.xsl" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.text() json_data = json.loads(data) if 'icestats' in json_data: sources = json_data['icestats'].get('source', []) if not isinstance(sources, list): sources = [sources] # Find our stream for src in sources: if src.get('listenurl', '').endswith(self.stream_endpoint): title = src.get('title') or src.get('song') or src.get('current_song') if title: return title except aiohttp.ClientError as e: continue except json.JSONDecodeError as e: continue return "Unable to fetch metadata" except Exception as e: return "Error fetching metadata" async def monitor_metadata(self): """Monitor the Icecast stream for metadata changes. Uses curl to read the stream and detect metadata changes. Handles stream errors, reconnection, and health checks. Announces new songs when detected. """ await asyncio.sleep(5) while self.should_monitor: try: cmd = [ 'curl', '-s', '-H', 'Icy-MetaData: 1', '--no-buffer', f"{self.stream_url}/{self.stream_endpoint}" ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) self.current_process = process # Store the process reference last_data_received = time.time() buffer = b"" last_json_check = time.time() json_check_interval = 60 # Fallback interval if ICY updates fail data_timeout = 180 # 3 minutes without data is considered a failure empty_chunks_count = 0 max_empty_chunks = 5 # After 5 empty chunks in a row, reconnect while True: try: chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0) if chunk: last_data_received = time.time() buffer += chunk empty_chunks_count = 0 # Reset counter on successful data else: empty_chunks_count += 1 if empty_chunks_count >= max_empty_chunks: break if time.time() - last_data_received > data_timeout: break current_time = time.time() # Periodic health check if current_time - self.last_health_check >= self.health_check_interval: pass self.last_health_check = current_time # Look for metadata marker but fetch from JSON if b"StreamTitle='" in buffer: new_song = await self.fetch_json_metadata() if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song: self.current_song = new_song await self.announce_song(new_song) # Clear buffer after metadata marker buffer = buffer[buffer.find(b"';", buffer.find(b"StreamTitle='")) + 2:] last_json_check = current_time # Keep buffer size reasonable if len(buffer) > 65536: buffer = buffer[-32768:] # Fallback JSON check if ICY updates aren't coming through if current_time - last_json_check >= json_check_interval: new_song = await self.fetch_json_metadata() if "Unable to fetch metadata" in new_song: if time.time() - last_data_received > data_timeout: break if new_song and new_song != self.current_song: self.current_song = new_song await self.announce_song(new_song) last_json_check = current_time await asyncio.sleep(0.1) except asyncio.TimeoutError: if time.time() - last_data_received > data_timeout: break continue except Exception as e: break # Check if process is still running and terminate if needed if process.returncode is None: try: process.terminate() await asyncio.wait_for(process.wait(), timeout=5.0) except asyncio.TimeoutError: pass except Exception as e: pass finally: self.current_process = None await asyncio.sleep(5) except Exception as e: await asyncio.sleep(5) async def announce_song(self, song: str): """Announce a song in the IRC channel. Args: song: The song title to announce. Only announces if: - The channel object is valid - The song doesn't match any ignore patterns - The announcement format is configured """ try: if self.channel and self.should_announce_song(song): # Use the stored channel object directly if hasattr(self.channel, 'name') and self.channel.name.startswith('#'): await self.channel.message(self.reply.format(song=song)) except Exception as e: pass async def start(self): """Start the IRC bot and begin processing events.""" print("Starting IcecastBot...") try: # Create a state file for the manager to detect state_file = Path(tempfile.gettempdir()) / 'icecast-irc-bots.json' try: # Load existing state if it exists state = {} if state_file.exists(): try: with open(state_file, 'r') as f: state = json.load(f) except json.JSONDecodeError: # File exists but is not valid JSON state = {} # Add this bot to the state state[self.bot_id] = { 'pid': os.getpid(), 'config': str(self.config_path) } # Save the state with open(state_file, 'w') as f: json.dump(state, f) print(f"Created state file at {state_file}") # Register a signal handler to remove this bot from the state file on exit def cleanup_state_file(signum, frame): try: if state_file.exists(): with open(state_file, 'r') as f: current_state = json.load(f) if self.bot_id in current_state: del current_state[self.bot_id] with open(state_file, 'w') as f: json.dump(current_state, f) print(f"Removed {self.bot_id} from state file") except Exception as e: print(f"Error cleaning up state file: {e}") sys.exit(0) # Register signal handlers signal.signal(signal.SIGTERM, cleanup_state_file) signal.signal(signal.SIGINT, cleanup_state_file) except Exception as e: print(f"Error creating state file: {e}") # Start the restart manager print("Starting restart manager...") await self.restart_manager.start() print("Restart manager started") # Start the bot print("Starting IRC bot...") await self.bot.run() print("IRC bot started") except Exception as e: print(f"Error starting bot: {e}") import traceback traceback.print_exc() finally: print("In start() finally block") if self.should_exit: print("Bot should exit, cleaning up...") # Clean up any remaining tasks if self.monitor_task: print("Canceling monitor task...") self.monitor_task.cancel() try: await self.monitor_task except asyncio.CancelledError: pass # Clean up restart manager print("Cleaning up restart manager...") self.restart_manager.cleanup() print("Restart manager cleaned up") def format_help_section(self, section_config: dict, prefix: str) -> List[str]: """Format a help section according to the template. Extracts the first line of each command's docstring to use as the help text. Falls back to the template's command descriptions if docstring is not available. Args: section_config: Configuration dictionary for the section. prefix: Command prefix to use. Returns: List[str]: List of formatted help lines for each command. """ commands = [] for cmd, desc in section_config['commands'].items(): # Try to get the docstring from the command handler handler = None for name, method in inspect.getmembers(self, predicate=inspect.ismethod): if name.endswith(f"_{cmd}"): # e.g. now_playing for np handler = method break if handler and handler.__doc__: # Extract the first line of the docstring first_line = handler.__doc__.strip().split('\n')[0] # Remove the command prefix and colon desc = first_line.split(':', 1)[1].strip() commands.append(section_config['format'].format( cmd=f"\x02{prefix}{cmd}\x02", # Bold the command desc=desc )) return commands async def help_command_fallback(self, message): """Fallback help command implementation using hardcoded format. Used when the help template is not configured or fails to process. Args: message: The IRC message object that triggered this command. """ try: prefix = self.cmd_prefix # Format commands with bold prefix and aligned descriptions base_cmds = f"\x02{prefix}np\x02 (current song) • \x02{prefix}help\x02 (this help)" help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | Commands: {base_cmds}" if self.is_admin(message.sender): admin_cmds = ( f"\x02{prefix}start\x02/\x02stop\x02 (monitoring) • " f"\x02{prefix}reconnect\x02 (stream) • " f"\x02{prefix}restart\x02 (bot) • " f"\x02{prefix}quit\x02 (shutdown)" ) help_text += f" | Admin: {admin_cmds}" if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): await message.recipient.message(help_text) except Exception as e: pass def admin_required(self, f): """Decorator to mark a command as requiring admin privileges. Also automatically adds the command to the admin_commands set for help message grouping. Args: f: The command handler function to wrap. Returns: The wrapped function that checks for admin privileges. """ async def wrapped(message, *args, **kwargs): recipient = getattr(message, 'recipient', None) is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') if not self.allow_private_commands and not is_channel: return if not self.is_admin(message.sender): if is_channel: await recipient.message("You don't have permission to use this command.") return return await f(message, *args, **kwargs) # Copy the docstring and other attributes wrapped.__doc__ = f.__doc__ wrapped.__name__ = f.__name__ # Add the command pattern to admin_commands set if f.__doc__ and f.__doc__.strip().startswith('!'): pattern = f.__doc__.strip().split(':', 1)[0].strip('!') self.admin_commands.add(pattern) return wrapped async def run_multiple_bots(config_paths: List[str]): """Run multiple bot instances concurrently. Each bot runs independently and can be stopped without affecting others. """ bots = [] tasks = [] for config_path in config_paths: try: bot = IcecastBot(config_path) bots.append(bot) # Create task for each bot tasks.append(asyncio.create_task(run_single_bot(bot))) except Exception as e: pass if not bots: pass return # Wait for all bots to complete await asyncio.gather(*tasks) async def run_single_bot(bot: IcecastBot): """Run a single bot instance. Args: bot: The IcecastBot instance to run """ print(f"Running single bot with ID: {bot.bot_id}") try: print("Starting bot...") await bot.start() print("Bot start completed") # Check if we should restart this bot if bot.restart_manager.should_restart: print("Bot should restart") # Clean up print("Cleaning up restart manager...") bot.restart_manager.cleanup() # Create and start a new instance print("Creating new bot instance...") new_bot = IcecastBot(bot.config_path) print("Starting new bot instance...") await run_single_bot(new_bot) # If should_exit is True but should_restart is False, just exit cleanly elif bot.should_exit: print("Bot should exit cleanly") bot.restart_manager.cleanup() except Exception as e: print(f"Error in run_single_bot: {e}") import traceback traceback.print_exc() finally: print("In run_single_bot finally block") # Ensure cleanup happens if bot.monitor_task: print("Canceling monitor task...") bot.monitor_task.cancel() try: await bot.monitor_task print("Monitor task canceled") except asyncio.CancelledError: print("Monitor task canceled with CancelledError") pass if __name__ == "__main__": print("Starting Icecast IRC Bot...") parser = argparse.ArgumentParser(description='Icecast IRC Bot') parser.add_argument('configs', nargs='*', help='Paths to config files') parser.add_argument('--config', type=str, help='Path to single config file') parser.add_argument('--irc-host', type=str, help='IRC server host') parser.add_argument('--irc-port', type=int, help='IRC server port') parser.add_argument('--irc-nick', type=str, help='IRC nickname') parser.add_argument('--irc-channel', type=str, help='IRC channel') parser.add_argument('--stream-url', type=str, help='Icecast stream URL (base url; do not include /stream or .mp3)') parser.add_argument('--stream-endpoint', type=str, help='Stream endpoint (e.g. /stream)') parser.add_argument('--cmd-prefix', type=str, help='Command prefix character(s)') args = parser.parse_args() print(f"Arguments parsed: {args}") # Set up the event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) print("Event loop created") async def run_bot(): try: print("Starting bot...") if args.configs: # Multi-bot mode print(f"Running in multi-bot mode with configs: {args.configs}") await run_multiple_bots(args.configs) else: # Single-bot mode print(f"Running in single-bot mode with config: {args.config}") bot = IcecastBot(args.config) print("Bot instance created") # Apply any command line overrides to the config if args.irc_host: bot.config['irc']['host'] = args.irc_host if args.irc_port: bot.config['irc']['port'] = args.irc_port if args.irc_nick: bot.config['irc']['nick'] = args.irc_nick if args.irc_channel: bot.config['irc']['channel'] = args.irc_channel if args.stream_url: bot.config['stream']['url'] = args.stream_url if args.stream_endpoint: bot.config['stream']['endpoint'] = args.stream_endpoint if args.cmd_prefix: if 'commands' not in bot.config: bot.config['commands'] = {} bot.config['commands']['prefix'] = args.cmd_prefix print("Starting single bot...") await run_single_bot(bot) except Exception as e: print(f"Error in run_bot: {e}") import traceback traceback.print_exc() sys.exit(1) try: # Run the bot print("Running event loop...") loop.run_until_complete(run_bot()) except Exception as e: print(f"Error in main: {e}") import traceback traceback.print_exc() finally: try: print("Cleaning up...") # Cancel any remaining tasks for task in asyncio.all_tasks(loop): task.cancel() # Give tasks a chance to cancel loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)) # Finally close the loop loop.close() print("Cleanup complete") except Exception as e: print(f"Error during cleanup: {e}") import traceback traceback.print_exc()