diff --git a/DOCS.md b/DOCS.md new file mode 100644 index 0000000..55c258d --- /dev/null +++ b/DOCS.md @@ -0,0 +1,137 @@ +# Icecast IRC Bot Documentation v1.1.0 + +This document is automatically generated from the codebase. + +## Overview + +An IRC bot that monitors an Icecast stream and announces track changes. + +## Configuration + +See `config.yaml.example` for a full example configuration file. + +No command documentation available (help template not found) + +## Methods + +### admin_required + +Decorator to mark a command as requiring admin privileges. + +**Arguments:** + +- `f`: The command handler function to wrap. +- `Returns`: + + +### announce_song + +Announce a song in the IRC channel. + +**Arguments:** + +- `song`: The song title to announce. +- `Only announces if`: + + +### fetch_json_metadata + +Fetch metadata from the Icecast JSON status endpoint. + +**Returns:** + +str: The current song title, or an error message if fetching failed. + + +### format_help_section + +Format a help section according to the template. + +**Arguments:** + +- `section_config`: Configuration dictionary for the section. +- `prefix`: Command prefix to use. +- `Returns`: +- `List[str]`: List of formatted help lines for each command. + + +### get_version + +Get the current version from VERSION file. + + +### help_command_fallback + +Fallback help command implementation using hardcoded format. + +**Arguments:** + +- `message`: The IRC message object that triggered this command. + + +### is_admin + +Check if a user has admin privileges. + +**Arguments:** + +- `user`: Full IRC user string (nickname!username@hostname) or User object. +- `Returns`: +- `bool`: True if user has admin privileges, False otherwise. + + +### load_config + +Load and validate the bot configuration from a YAML file. + +**Arguments:** + +- `config_path`: Path to the YAML configuration file. If None, uses default path. +- `Returns`: +- `dict`: The loaded and validated configuration dictionary with default values applied. + + +### monitor_metadata + +Monitor the Icecast stream for metadata changes. + + +### restart_monitoring + +Restart the metadata monitoring task and verify the reconnection. + +**Returns:** + +bool: True if reconnection was successful and verified, False otherwise. + + +### setup_handlers + +Set up all IRC event handlers and command patterns. + + +### should_announce_song + +Check if a song should be announced based on configured ignore patterns. + +**Arguments:** + +- `song`: The song title to check. +- `Returns`: +- `bool`: True if the song should be announced, False if it matches any ignore patterns. + + +### start + +Start the IRC bot and begin processing events. + + +### start_monitoring + +Start the metadata monitoring task. + + +### stop_monitoring + +Stop the metadata monitoring task. + diff --git a/README.md b/README.md index 7a14e1c..a536287 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Icecast-metadata-IRC-announcer -[![Version](https://img.shields.io/badge/version-1.0.1-blue.svg)](https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/releases/tag/v1.0.1) +[![Version](https://img.shields.io/badge/dynamic/raw?color=blue&label=version&query=.&url=https%3A%2F%2Fcode.cottongin.xyz%2Fcottongin%2FIcecast-metadata-IRC-announcer%2Fraw%2Fbranch%2Fmaster%2FVERSION)](https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/releases) A simple asynchronous Python bot that monitors an Icecast stream and announces track changes to an IRC channel. Supports running multiple instances with different configurations. diff --git a/VERSION b/VERSION index 8470eb0..1cc5f65 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.1 \ No newline at end of file +1.1.0 \ No newline at end of file diff --git a/config.yaml.example b/config.yaml.example index 87e2e11..71bc729 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -23,9 +23,17 @@ commands: require_nick_prefix: false # If true, commands must be prefixed with "botname: " or "botname, " allow_private_commands: false # If true, allows commands in private messages +help: # Help message templates + specific_format: "\x02{prefix}{cmd}\x02: {desc}" # Format for specific command help + list_format: "(\x02{cmd}\x02, {desc})" # Format for commands in list + list_separator: " | " # Separator between commands in list + admin: users: # List of users who can use admin commands (use "*" for anyone) - "*" logging: - level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL \ No newline at end of file + level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL + format: "%(asctime)s - %(levelname)s - %(message)s" # Format for console logs + error_format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" # Format for error logs + datefmt: "%H:%M:%S" # Date/time format for log timestamps \ No newline at end of file diff --git a/generate_docs.py b/generate_docs.py new file mode 100755 index 0000000..267faf2 --- /dev/null +++ b/generate_docs.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +import inspect +import yaml +from pathlib import Path +from main import IcecastBot + +def get_version(): + """Get the current version from VERSION file.""" + try: + with open('VERSION') as f: + return f.read().strip() + except FileNotFoundError: + return "Unknown" + +def format_docstring(obj): + """Format a docstring into markdown.""" + doc = inspect.getdoc(obj) + if not doc: + return "" + + # Split into description and args + parts = doc.split('\n\n') + formatted = [parts[0]] # Description + + for part in parts[1:]: + if part.startswith('Args:'): + formatted.append("\n**Arguments:**\n") + # Parse args section + args = part.replace('Args:', '').strip().split('\n') + for arg in args: + if ':' in arg: + name, desc = arg.split(':', 1) + formatted.append(f"- `{name.strip()}`: {desc.strip()}") + elif part.startswith('Returns:'): + formatted.append("\n**Returns:**\n") + formatted.append(part.replace('Returns:', '').strip()) + + return '\n'.join(formatted) + +def generate_command_docs(config_path='config.yaml.example'): + """Generate command documentation from help templates.""" + try: + with open(config_path) as f: + config = yaml.safe_load(f) + except FileNotFoundError: + return "No command documentation available (config file not found)" + + help_config = config.get('commands', {}).get('help', {}) + if not help_config: + return "No command documentation available (help template not found)" + + docs = ["## Commands\n"] + + # Regular commands + if 'commands' in help_config.get('sections', {}): + docs.append("### Regular Commands\n") + for cmd, desc in help_config['sections']['commands']['commands'].items(): + docs.append(f"- `{cmd}`: {desc}") + docs.append("") + + # Admin commands + if 'admin' in help_config.get('sections', {}): + docs.append("### Admin Commands\n") + docs.append("These commands are only available to users listed in the `admin.users` config section.\n") + for cmd, desc in help_config['sections']['admin']['commands'].items(): + docs.append(f"- `{cmd}`: {desc}") + docs.append("") + + return '\n'.join(docs) + +def generate_docs(): + """Generate full documentation in markdown format.""" + version = get_version() + + docs = [ + f"# Icecast IRC Bot Documentation v{version}\n", + "This document is automatically generated from the codebase.\n", + "## Overview\n", + format_docstring(IcecastBot), + "\n## Configuration\n", + "See `config.yaml.example` for a full example configuration file.\n", + generate_command_docs(), + "\n## Methods\n" + ] + + # Document public methods + for name, method in inspect.getmembers(IcecastBot, predicate=inspect.isfunction): + if not name.startswith('_'): # Only public methods + docs.append(f"### {name}\n") + docs.append(format_docstring(method)) + docs.append("\n") + + # Write to DOCS.md + with open('DOCS.md', 'w') as f: + f.write('\n'.join(docs)) + +if __name__ == '__main__': + generate_docs() \ No newline at end of file diff --git a/icecast-irc-bot-manager.py b/icecast-irc-bot-manager.py new file mode 100644 index 0000000..98b1d57 --- /dev/null +++ b/icecast-irc-bot-manager.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 + +import argparse +import asyncio +import json +import logging +import os +import signal +import sys +import tempfile +from pathlib import Path +from typing import Dict, List, Optional + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger('icecast-irc-bot-manager') + +class BotManager: + """Manages multiple Icecast IRC bot instances.""" + + def __init__(self): + self.bots: Dict[str, asyncio.subprocess.Process] = {} + self.config_dir = Path('/etc/icecast-irc-bot') + self.socket_dir = Path(tempfile.gettempdir()) + + async def start_bot(self, config_path: Path) -> bool: + """Start a bot instance with the given config. + + Args: + config_path: Path to the bot's config file + + Returns: + bool: True if bot was started successfully + """ + try: + # Create unique name for this bot instance + bot_id = config_path.stem + + # Check if bot is already running + if bot_id in self.bots: + logger.warning(f"Bot {bot_id} is already running") + return False + + # Start the bot process + process = await asyncio.create_subprocess_exec( + sys.executable, '-m', 'main', + '--config', str(config_path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + self.bots[bot_id] = process + logger.info(f"Started bot {bot_id} (PID: {process.pid})") + return True + + except Exception as e: + logger.error(f"Failed to start bot with config {config_path}: {e}") + return False + + async def stop_bot(self, bot_id: str) -> bool: + """Stop a running bot instance. + + Args: + bot_id: ID of the bot to stop + + Returns: + bool: True if bot was stopped successfully + """ + if bot_id not in self.bots: + logger.warning(f"Bot {bot_id} is not running") + return False + + try: + process = self.bots[bot_id] + process.terminate() + try: + await asyncio.wait_for(process.wait(), timeout=5.0) + except asyncio.TimeoutError: + process.kill() + await process.wait() + + del self.bots[bot_id] + logger.info(f"Stopped bot {bot_id}") + return True + + except Exception as e: + logger.error(f"Failed to stop bot {bot_id}: {e}") + return False + + async def restart_bot(self, bot_id: str) -> bool: + """Restart a running bot instance. + + Args: + bot_id: ID of the bot to restart + + Returns: + bool: True if bot was restarted successfully + """ + # Find the config file for this bot + config_path = self.config_dir / f"{bot_id}.yaml" + if not config_path.exists(): + logger.error(f"Config file not found for bot {bot_id}") + return False + + # Stop the bot if it's running + if bot_id in self.bots: + if not await self.stop_bot(bot_id): + return False + + # Start the bot with the same config + return await self.start_bot(config_path) + + async def list_bots(self) -> List[Dict]: + """List all running bot instances. + + Returns: + List[Dict]: List of bot info dictionaries + """ + bot_info = [] + for bot_id, process in self.bots.items(): + info = { + 'id': bot_id, + 'pid': process.pid, + 'running': process.returncode is None + } + bot_info.append(info) + return bot_info + + async def cleanup(self): + """Clean up all running bots.""" + for bot_id in list(self.bots.keys()): + await self.stop_bot(bot_id) + +async def main(): + parser = argparse.ArgumentParser(description='Icecast IRC Bot Manager') + parser.add_argument('--config', type=str, help='Path to config file or directory') + parser.add_argument('command', choices=['start', 'stop', 'restart', 'list'], help='Command to execute') + parser.add_argument('bot_id', nargs='?', help='Bot ID for start/stop/restart commands') + + args = parser.parse_args() + + manager = BotManager() + + try: + if args.command == 'list': + bot_info = await manager.list_bots() + if bot_info: + print(json.dumps(bot_info, indent=2)) + else: + print("No bots running") + + elif args.command == 'start': + if not args.config: + print("Error: --config required for start command") + sys.exit(1) + config_path = Path(args.config) + if config_path.is_dir(): + # Start all bots in directory + for config_file in config_path.glob('*.yaml'): + await manager.start_bot(config_file) + else: + # Start single bot + await manager.start_bot(config_path) + + elif args.command == 'stop': + if not args.bot_id: + print("Error: bot_id required for stop command") + sys.exit(1) + await manager.stop_bot(args.bot_id) + + elif args.command == 'restart': + if not args.bot_id: + print("Error: bot_id required for restart command") + sys.exit(1) + await manager.restart_bot(args.bot_id) + + except KeyboardInterrupt: + logger.info("Shutting down...") + await manager.cleanup() + except Exception as e: + logger.error(f"Unhandled error: {e}") + sys.exit(1) + +if __name__ == '__main__': + # Set up signal handlers + for sig in (signal.SIGTERM, signal.SIGINT): + signal.signal(sig, lambda s, f: sys.exit(0)) + + # Run the manager + asyncio.run(main()) \ No newline at end of file diff --git a/icecast-irc-bot-manager.service b/icecast-irc-bot-manager.service new file mode 100644 index 0000000..e6728d5 --- /dev/null +++ b/icecast-irc-bot-manager.service @@ -0,0 +1,26 @@ +[Unit] +Description=Icecast IRC Bot Manager +After=network.target +Wants=network.target +Documentation=https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer + +[Service] +Type=simple +User=icecast-bot +Group=icecast-bot +Environment=VIRTUAL_ENV=/opt/icecast-irc-bot/venv +Environment=PATH=/opt/icecast-irc-bot/venv/bin:$PATH +ExecStart=/opt/icecast-irc-bot/venv/bin/python3 /usr/local/bin/icecast-irc-bot-manager --config /etc/icecast-irc-bot/config.yaml +Restart=on-failure +RestartSec=5s +StandardOutput=append:/var/log/icecast-irc-bot/bot.log +StandardError=append:/var/log/icecast-irc-bot/error.log + +# Security settings +NoNewPrivileges=yes +ProtectSystem=full +ProtectHome=yes +PrivateTmp=yes + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..8b89e05 --- /dev/null +++ b/install.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +# Exit on any error +set -e + +# Check if running as root +if [ "$EUID" -ne 0 ]; then + echo "Please run as root" + exit 1 +fi + +# Create icecast-bot user and group if they don't exist +if ! getent group icecast-bot >/dev/null; then + groupadd icecast-bot +fi +if ! getent passwd icecast-bot >/dev/null; then + useradd -r -g icecast-bot -s /bin/false icecast-bot +fi + +# Create necessary directories +mkdir -p /etc/icecast-irc-bot +mkdir -p /var/log/icecast-irc-bot +mkdir -p /opt/icecast-irc-bot + +# Set ownership +chown icecast-bot:icecast-bot /etc/icecast-irc-bot +chown icecast-bot:icecast-bot /var/log/icecast-irc-bot +chown icecast-bot:icecast-bot /opt/icecast-irc-bot + +# Create and activate virtual environment +python3 -m venv /opt/icecast-irc-bot/venv +export VIRTUAL_ENV="/opt/icecast-irc-bot/venv" +export PATH="$VIRTUAL_ENV/bin:$PATH" +unset PYTHONHOME + +# Install Python dependencies in virtual environment +pip3 install --upgrade pip +pip3 install hatchling +pip3 install -r requirements.txt +pip3 install . + +# Install manager script +cp icecast-irc-bot-manager.py /usr/local/bin/icecast-irc-bot-manager +chmod +x /usr/local/bin/icecast-irc-bot-manager + +# Update manager script shebang to use venv Python +sed -i "1c#\!$VIRTUAL_ENV/bin/python3" /usr/local/bin/icecast-irc-bot-manager + +# Install service file +cp icecast-irc-bot-manager.service /etc/systemd/system/ +systemctl daemon-reload + +# Copy example config if it doesn't exist +if [ ! -f /etc/icecast-irc-bot/config.yaml ]; then + cp config.yaml.example /etc/icecast-irc-bot/config.yaml + chown icecast-bot:icecast-bot /etc/icecast-irc-bot/config.yaml + chmod 640 /etc/icecast-irc-bot/config.yaml +fi + +echo "Installation complete!" +echo +echo "Next steps:" +echo "1. Configure your bot(s) in /etc/icecast-irc-bot/config.yaml" +echo "2. Start the service: systemctl start icecast-irc-bot-manager" +echo "3. Enable at boot: systemctl enable icecast-irc-bot-manager" +echo +echo "Usage:" +echo "icecast-irc-bot-manager list # List running bots" +echo "icecast-irc-bot-manager start --config FILE # Start a bot" +echo "icecast-irc-bot-manager stop BOT_ID # Stop a bot" +echo "icecast-irc-bot-manager restart BOT_ID # Restart a bot" \ No newline at end of file diff --git a/main.py b/main.py index 6702a00..a1b6e76 100644 --- a/main.py +++ b/main.py @@ -11,31 +11,132 @@ class AsifFilter(logging.Filter): record.name.startswith('asif') or (isinstance(record.msg, str) and record.msg.startswith('Joined channel'))) +# ANSI color codes +class Colors: + COLORS = [ + '\033[94m', # BLUE + '\033[96m', # CYAN + '\033[92m', # GREEN + '\033[93m', # YELLOW + '\033[95m', # MAGENTA + '\033[91m', # RED + ] + ENDC = '\033[0m' + BOLD = '\033[1m' + + # Additional colors for log levels + GREY = '\033[37m' + WHITE = '\033[97m' + RED = '\033[91m' + ORANGE = '\033[38;5;208m' # Using 256-color code for orange + CYAN = '\033[96m' + MAGENTA = '\033[95m' + + # Track used colors + used_colors = {} # botname -> color mapping + + @classmethod + def get_color_for_bot(cls, botname: str) -> str: + """Get a consistent color for a bot based on its name.""" + # If this bot already has a color, return it + if botname in cls.used_colors: + return cls.used_colors[botname] + + # If we still have unused colors, use the next available one + unused_colors = [c for c in cls.COLORS if c not in cls.used_colors.values()] + if unused_colors: + color = unused_colors[0] + else: + # If we're out of unique colors, fall back to hash-based selection + color = cls.COLORS[hash(botname) % len(cls.COLORS)] + + cls.used_colors[botname] = color + return color + +class ColoredLevelFormatter(logging.Formatter): + """Custom formatter that adds colors to log levels and ensures fixed width.""" + + # Color mapping for different log levels + LEVEL_COLORS = { + 'DEBUG': Colors.CYAN, + 'INFO': Colors.WHITE, + 'WARNING': Colors.ORANGE, + 'ERROR': Colors.RED, + 'CRITICAL': Colors.MAGENTA + } + + def format(self, record): + # Save original level name + original_level = record.levelname + # Pad level name to 8 characters (length of 'WARNING') + colored_level = f"{self.LEVEL_COLORS.get(original_level, Colors.WHITE)}{original_level:8}{Colors.ENDC}" + record.levelname = colored_level + result = super().format(record) + # Restore original level name + record.levelname = original_level + return result + # Defer logging setup until after config is loaded -def setup_logging(log_level: str = 'INFO'): - """Set up logging with the specified level.""" +def setup_logging(name: str, config: dict) -> logging.Logger: + """Set up logging with the specified configuration. + + Args: + name: The name for this logger instance + config: The logging configuration dictionary containing: + - level: The log level to use + - format: The log format template + - datefmt: The date format template + + Returns: + logging.Logger: The configured logger instance + """ + # Get logging configuration with defaults + log_config = config.get('logging', {}) + log_level = log_config.get('level', 'INFO') + log_format = log_config.get('format', '%(asctime)s - %(levelname)s - %(message)s') + date_format = log_config.get('datefmt', '%H:%M:%S') + # Convert string level to logging constant numeric_level = getattr(logging, log_level.upper(), logging.INFO) - # Set up base logging configuration - logging.basicConfig( - level=numeric_level, - format='%(asctime)s - %(levelname)s - %(message)s', - datefmt='%H:%M:%S' + # Create a logger with the given name + logger = logging.getLogger(name) + logger.setLevel(numeric_level) + + # Create console handler with colored formatter + console_handler = logging.StreamHandler() + console_handler.setFormatter( + ColoredLevelFormatter(log_format, datefmt=date_format) ) - - # Apply filter to root logger - logging.getLogger().addFilter(AsifFilter()) - - # Configure asif's logger - asif_logger = logging.getLogger('asif') - asif_logger.addFilter(AsifFilter()) - asif_logger.setLevel(logging.CRITICAL) - asif_logger.propagate = False + + # Add AsifFilter to the logger + logger.addFilter(AsifFilter()) + + # Add the console handler to logger + logger.addHandler(console_handler) + + # Prevent propagation to root logger + logger.propagate = False + + return logger # Set up error logging for asif -error_handler = logging.FileHandler('ERROR.log') -error_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) +def create_error_handler(bot_name: str, config: dict) -> logging.FileHandler: + """Create an error handler for the given bot instance. + + Args: + bot_name: The name of the bot instance + config: The logging configuration dictionary + """ + handler = logging.FileHandler(f'ERROR_{bot_name}.log') + # Get error log format from config or use default + log_config = config.get('logging', {}) + error_format = log_config.get('error_format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + date_format = log_config.get('datefmt', '%H:%M:%S') + + # Use the colored formatter for error logs too + handler.setFormatter(ColoredLevelFormatter(error_format, datefmt=date_format)) + return handler from asif import Client @@ -52,9 +153,10 @@ def patch_client_bg(self, coro): return asyncio.ensure_future(runner()) # Patch asif's Client class to use error logger and patched _bg -def silent_client_init(self, *args, **kwargs): +def silent_client_init(self, *args, bot_name: str = None, config: dict = None, **kwargs): # Create a logger that writes to ERROR.log - error_logger = logging.getLogger('asif.client') + error_logger = logging.getLogger(f'asif.client.{bot_name}' if bot_name else 'asif.client') + error_handler = create_error_handler(bot_name, config) if bot_name else logging.FileHandler('ERROR.log') error_logger.addHandler(error_handler) error_logger.propagate = False # Don't send to console error_logger.setLevel(logging.INFO) # Capture all messages @@ -82,40 +184,10 @@ import yaml from pathlib import Path from typing import List, Optional import sys - -# ANSI color codes -class Colors: - COLORS = [ - '\033[94m', # BLUE - '\033[96m', # CYAN - '\033[92m', # GREEN - '\033[93m', # YELLOW - '\033[95m', # MAGENTA - '\033[91m', # RED - ] - ENDC = '\033[0m' - BOLD = '\033[1m' - - # Track used colors - used_colors = {} # botname -> color mapping - - @classmethod - def get_color_for_bot(cls, botname: str) -> str: - """Get a consistent color for a bot based on its name.""" - # If this bot already has a color, return it - if botname in cls.used_colors: - return cls.used_colors[botname] - - # If we still have unused colors, use the next available one - unused_colors = [c for c in cls.COLORS if c not in cls.used_colors.values()] - if unused_colors: - color = unused_colors[0] - else: - # If we're out of unique colors, fall back to hash-based selection - color = cls.COLORS[hash(botname) % len(cls.COLORS)] - - cls.used_colors[botname] = color - return color +import inspect +import os +import socket +import tempfile class BotLoggerAdapter(logging.LoggerAdapter): # Class variables to track maximum lengths @@ -165,34 +237,149 @@ class BotLoggerAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return f'[{self.colored_botname}] {msg}', kwargs -# Add flag file constant at the top level -RESTART_FLAG_FILE = ".restart_flag" +class RestartManager: + """Manages bot restarts using Unix Domain Sockets. + + This class provides a clean way to handle bot restarts without using + flag files. Each bot instance gets its own Unix Domain Socket. + """ + + def __init__(self, bot_id: str): + """Initialize the restart manager. + + Args: + bot_id: Unique identifier for this bot instance + """ + self.bot_id = bot_id + self.socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock" + self.server = None + self.should_restart = False + + async def start(self): + """Start the restart manager server.""" + # Clean up any existing socket + if self.socket_path.exists(): + self.socket_path.unlink() + + # Create the Unix Domain Socket server + self.server = await asyncio.start_unix_server( + self._handle_restart_request, + str(self.socket_path) + ) + + async def _handle_restart_request(self, reader, writer): + """Handle an incoming restart request.""" + try: + data = await reader.read() + if data == b'restart': + self.should_restart = True + writer.close() + await writer.wait_closed() + except Exception as e: + logging.error(f"Error handling restart request: {e}") + + def cleanup(self): + """Clean up the socket file.""" + try: + if self.server: + self.server.close() + if self.socket_path.exists(): + self.socket_path.unlink() + except Exception as e: + logging.error(f"Error cleaning up restart manager: {e}") + + @staticmethod + async def signal_restart(bot_id: str): + """Signal a specific bot to restart. + + Args: + bot_id: ID of the bot to restart + """ + socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock" + try: + reader, writer = await asyncio.open_unix_connection(str(socket_path)) + writer.write(b'restart') + await writer.drain() + writer.close() + await writer.wait_closed() + except Exception as e: + logging.error(f"Error signaling restart to bot {bot_id}: {e}") class IcecastBot: - VERSION = "1.0.1" + def __init__(self, config_path: Optional[str] = None): + # Store config path for potential restarts + self.config_path = config_path + + # Load config + self.config = self.load_config(config_path) + ... + +class IcecastBot: + """An IRC bot that monitors an Icecast stream and announces track changes. + + This bot connects to an IRC server and channel, monitors a specified Icecast stream + for metadata changes, and announces new tracks. It supports various commands for + both regular users and administrators. + + Features: + - Configurable via YAML files + - Multiple URL patterns for metadata fetching + - Automatic reconnection and error recovery + - Admin commands with permission system + - Customizable help messages and announcements + """ + + def get_version(): + """Get the current version from VERSION file.""" + with open('VERSION') as f: + return f.read().strip() + + VERSION = get_version() def __init__(self, config_path: Optional[str] = None): + """Initialize the bot with the given configuration. + + Args: + config_path: Path to the YAML configuration file. If None, uses default path. + """ + # Store config path for potential restarts + self.config_path = config_path + # Load config self.config = self.load_config(config_path) + # Create unique bot ID from nick and endpoint + self.bot_id = f"{self.config['irc']['nick']}_{self.config['stream']['endpoint']}" + + # Initialize restart manager + self.restart_manager = RestartManager(self.bot_id) + # Set up bot-specific logger + bot_name = f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}' + base_logger = setup_logging( + f'icecast_bot.{bot_name}', + self.config + ) + self.logger = BotLoggerAdapter( - logging.getLogger(__name__), - {'botname': f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}'} + base_logger, + {'botname': bot_name} ) self.logger.info(f"Starting Icecast IRC Bot v{self.VERSION}") - # Initialize IRC bot with config + # Initialize IRC bot with config and bot name for logging self.bot = Client( host=self.config['irc']['host'], port=self.config['irc']['port'], user=self.config['irc']['user'], realname=self.config['irc']['realname'], - nick=self.config['irc']['nick'] + nick=self.config['irc']['nick'], + bot_name=bot_name, + config=self.config ) - # Set up instance variables from config + # Set up instance variables self.channel_name = self.config['irc']['channel'] self.stream_url = self.config['stream']['url'] self.stream_endpoint = self.config['stream']['endpoint'] @@ -208,21 +395,42 @@ class IcecastBot: self.require_nick_prefix = self.config.get('commands', {}).get('require_nick_prefix', False) self.allow_private_commands = self.config.get('commands', {}).get('allow_private_commands', False) + # Get help format templates + help_config = self.config.get('help', {}) + self.help_specific_format = help_config.get('specific_format', "\x02{prefix}{cmd}\x02: {desc}") + self.help_list_format = help_config.get('list_format', "(\x02{cmd}\x02, {desc})") + self.help_list_separator = help_config.get('list_separator', " | ") + # Control flags self.monitor_task = None self.should_monitor = True self.is_monitoring = False - self.admin_users = self.config.get('admin', {}).get('users', ['*']) # '*' means anyone can use admin commands - self.current_process = None # Track the current subprocess - - # Create a unique restart flag file for this bot instance - self.restart_flag_file = f".restart_flag_{self.config['irc']['nick']}_{self.stream_endpoint}" + self.admin_users = self.config.get('admin', {}).get('users', ['*']) + self.current_process = None + self.should_exit = False + # Initialize command mappings + self.pattern_to_method = {} + self.method_to_pattern = {} + self.admin_commands = set() + self.command_handlers = {} + + # Set up handlers self.setup_handlers() + + # Build command mappings + self._build_command_mappings() @staticmethod def load_config(config_path: Optional[str] = None) -> dict: - """Load configuration from file and/or command line arguments.""" + """Load and validate the bot configuration from a YAML file. + + Args: + config_path: Path to the YAML configuration file. If None, uses default path. + + Returns: + dict: The loaded and validated configuration dictionary with default values applied. + """ if config_path is None: config_path = Path(__file__).parent / 'config.yaml' @@ -231,9 +439,6 @@ class IcecastBot: with open(config_path) as f: config = yaml.safe_load(f) except FileNotFoundError: - # Create a temporary logger for config loading - temp_logger = logging.getLogger(__name__) - temp_logger.warning(f"Config file not found at {config_path}, using defaults") config = { 'irc': {}, 'stream': {}, @@ -242,19 +447,47 @@ class IcecastBot: 'ignore_patterns': ['Unknown', 'Unable to fetch metadata'] }, 'logging': { - 'level': 'INFO' + 'level': 'INFO', + 'format': '%(asctime)s - %(levelname)s - %(message)s', + 'error_format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + 'datefmt': '%H:%M:%S' } } - - # Set up logging with configured level - setup_logging(config.get('logging', {}).get('level', 'INFO')) + + # Ensure logging config exists with defaults + if 'logging' not in config: + config['logging'] = {} + if 'format' not in config['logging']: + config['logging']['format'] = '%(asctime)s - %(levelname)s - %(message)s' + if 'error_format' not in config['logging']: + config['logging']['error_format'] = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + if 'datefmt' not in config['logging']: + config['logging']['datefmt'] = '%H:%M:%S' + if 'level' not in config['logging']: + config['logging']['level'] = 'INFO' + return config def should_announce_song(self, song: str) -> bool: - """Check if the song should be announced based on ignore patterns.""" + """Check if a song should be announced based on configured ignore patterns. + + Args: + song: The song title to check. + + Returns: + bool: True if the song should be announced, False if it matches any ignore patterns. + """ return not any(pattern.lower() in song.lower() for pattern in self.ignore_patterns) def setup_handlers(self): + """Set up all IRC event handlers and command patterns. + + This method configures: + - Connection and join handlers + - Command pattern creation + - Message debugging + - Command handlers (np, help, admin commands) + """ @self.bot.on_connected() async def connected(): try: @@ -273,7 +506,17 @@ class IcecastBot: self.channel = channel def create_command_pattern(cmd: str) -> re.Pattern: - """Create a regex pattern for a command that handles nick prefixes.""" + """Create a regex pattern for matching IRC commands. + + The pattern handles optional nick prefixes (if configured) and + command prefixes. + + Args: + cmd: The command name without prefix. + + Returns: + re.Pattern: Compiled regex pattern for matching the command. + """ if self.require_nick_prefix: pattern = f"^({self.config['irc']['nick']}[:,] )?{re.escape(self.cmd_prefix)}{cmd}($| .*$)" else: @@ -284,7 +527,11 @@ class IcecastBot: # Global message handler for debugging @self.bot.on_message() async def debug_message(message): - """Debug handler to log all messages""" + """Debug handler for logging all received messages. + + Args: + message: The IRC message object to log. + """ try: # Log full message details only at DEBUG level self.logger.debug( @@ -308,6 +555,14 @@ class IcecastBot: # Command handlers with additional debug logging @self.bot.on_message(create_command_pattern('np')) async def now_playing(message): + """!np: Show the currently playing song + + Displays the current song title from the Icecast stream. + The song title is fetched from the stream's metadata or JSON status. + + Args: + message: IRC message object + """ self.logger.debug(f"np handler called with message: {getattr(message, 'text', None)!r}") recipient = getattr(message, 'recipient', None) @@ -327,9 +582,18 @@ class IcecastBot: self.logger.debug("Could not send np reply - invalid recipient") except Exception as e: self.logger.error(f"Error in np handler: {e}", exc_info=True) + self.command_handlers['now_playing'] = now_playing @self.bot.on_message(create_command_pattern('help')) async def help_command(message): + """!help: Show available commands + + Display all available commands or detailed help for a specific command. + Usage: !help [command] + + Args: + message: IRC message object + """ self.logger.debug(f"help handler called with message: {getattr(message, 'text', None)!r}") recipient = getattr(message, 'recipient', None) @@ -340,23 +604,92 @@ class IcecastBot: if not self.allow_private_commands and not is_channel: self.logger.debug("Ignoring private help command") return - + try: - prefix = self.cmd_prefix - help_text = ( - f"Icecast IRC Bot v{self.VERSION}\n" - f"Available commands:\n" - f"{prefix}np - Show current song\n" - f"{prefix}help - Show this help message\n" - ) - if self.is_admin(message.sender): - help_text += ( - f"{prefix}start - Start stream monitoring\n" - f"{prefix}stop - Stop stream monitoring\n" - f"{prefix}reconnect - Reconnect to stream\n" - f"{prefix}restart - Restart the bot\n" - f"{prefix}quit - Shutdown the bot\n" - ) + # Parse message to check if a specific command was requested + msg_text = getattr(message, 'text', '') + parts = msg_text.strip().split() + + self.logger.debug(f"Available command patterns: {list(self.pattern_to_method.keys())}") + self.logger.debug(f"Admin commands: {list(self.admin_commands)}") + + if len(parts) > 1: + # Specific command help requested + pattern = parts[1].lower() + if pattern.startswith(self.cmd_prefix): + pattern = pattern[len(self.cmd_prefix):] # Remove prefix if included + + self.logger.debug(f"Looking up help for command pattern: {pattern}") + + # Find the command handler using our mapping + method_name = self.pattern_to_method.get(pattern) + self.logger.debug(f"Found method name: {method_name}") + + if method_name: + handler = self.command_handlers.get(method_name) + if handler and handler.__doc__: + # Get the first line of the docstring + first_line = handler.__doc__.strip().split('\n')[0] + # Format it using the template and add (admin only) if needed + desc = first_line.split(':', 1)[1].strip() + if pattern in self.admin_commands: + desc = f"{desc} (admin only)" + help_text = self.help_specific_format.format( + prefix=self.cmd_prefix, + cmd=pattern, + desc=desc + ) + + # Check if user has permission for this command + if pattern in self.admin_commands and not self.is_admin(message.sender): + help_text = "You don't have permission to use this command." + else: + help_text = f"No help available for command: {pattern}" + else: + help_text = f"Unknown command: {pattern}" + else: + # Build command list with proper formatting + formatted_groups = [] + + # Add general commands first + general_commands = [] + for pattern, method_name in self.pattern_to_method.items(): + if pattern not in self.admin_commands: # If not an admin command + handler = self.command_handlers.get(method_name) + if handler and handler.__doc__: + first_line = handler.__doc__.strip().split('\n')[0] + desc = first_line.split(':', 1)[1].strip() + general_commands.append( + self.help_list_format.format( + cmd=f"{self.cmd_prefix}{pattern}", + desc=desc + ) + ) + if general_commands: + formatted_groups.append(self.help_list_separator.join(general_commands)) + + # Add admin commands if user is admin + if self.is_admin(message.sender): + admin_commands = [] + for pattern in sorted(self.admin_commands): + method_name = self.pattern_to_method.get(pattern) + if method_name: + handler = self.command_handlers.get(method_name) + if handler and handler.__doc__: + first_line = handler.__doc__.strip().split('\n')[0] + desc = first_line.split(':', 1)[1].strip() + # Don't add (admin only) in the list view + admin_commands.append( + self.help_list_format.format( + cmd=f"{self.cmd_prefix}{pattern}", + desc=desc + ) + ) + if admin_commands: + formatted_groups.append("Admin: " + self.help_list_separator.join(admin_commands)) + + help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | " + " | ".join(formatted_groups) + if is_channel: await recipient.message(help_text) self.logger.debug("Successfully sent help reply") @@ -364,112 +697,160 @@ class IcecastBot: self.logger.debug("Could not send help reply - invalid recipient") except Exception as e: self.logger.error(f"Error in help handler: {e}", exc_info=True) + self.command_handlers['help_command'] = help_command @self.bot.on_message(create_command_pattern('restart')) + @self.admin_required async def restart_bot(message): - recipient = getattr(message, 'recipient', None) - is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') - - if not self.allow_private_commands and not is_channel: - return - if not self.is_admin(message.sender): - if is_channel: - await recipient.message("You don't have permission to use this command.") - return - + """!restart: Restart the bot (admin only) + + Gracefully shuts down the bot and signals the bot.sh script + to restart it. This ensures a clean restart. + + Args: + message: IRC message object + """ self.logger.info(f"Restart command received from {message.sender}") await self.stop_monitoring() try: await self.bot.quit("Restarting...") - with open(self.restart_flag_file, 'w') as f: - f.write('restart') + # Signal restart through Unix Domain Socket + await RestartManager.signal_restart(self.bot_id) + self.should_exit = True except Exception as e: self.logger.error(f"Error during restart: {e}") - - sys.exit(0) + self.command_handlers['restart_bot'] = restart_bot @self.bot.on_message(create_command_pattern('quit')) + @self.admin_required async def quit_bot(message): - recipient = getattr(message, 'recipient', None) - is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') - - if not self.allow_private_commands and not is_channel: - return - if not self.is_admin(message.sender): - if is_channel: - await recipient.message("You don't have permission to use this command.") - return - + """!quit: Shutdown the bot (admin only) + + Gracefully shuts down the bot and exits without restarting. + + Args: + message: IRC message object + """ self.logger.info(f"Quit command received from {message.sender}") await self.stop_monitoring() try: await self.bot.quit("Shutting down...") + self.should_exit = True except Exception as e: self.logger.error(f"Error during shutdown: {e}") - - sys.exit(0) + self.command_handlers['quit_bot'] = quit_bot @self.bot.on_message(create_command_pattern('reconnect')) + @self.admin_required async def reconnect_stream(message): - recipient = getattr(message, 'recipient', None) - is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') - - if not self.allow_private_commands and not is_channel: - return - if not self.is_admin(message.sender): - if is_channel: - await recipient.message("You don't have permission to use this command.") - return - + """!reconnect: Reconnect to the stream (admin only) + + Attempts to reconnect to the stream and verifies the connection. + Reports success or failure back to the channel. + + Args: + message: IRC message object + """ self.logger.info(f"Reconnect command received from {message.sender}") success = await self.restart_monitoring() - if not success and is_channel: - await recipient.message("Stream reconnection may have failed. Check logs for details.") + if not success and hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): + await message.recipient.message("Stream reconnection may have failed. Check logs for details.") + self.command_handlers['reconnect_stream'] = reconnect_stream @self.bot.on_message(create_command_pattern('stop')) + @self.admin_required async def stop_monitoring(message): - recipient = getattr(message, 'recipient', None) - is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') - - if not self.allow_private_commands and not is_channel: - return - if not self.is_admin(message.sender): - if is_channel: - await recipient.message("You don't have permission to use this command.") - return - + """!stop: Stop stream monitoring (admin only) + + Stops monitoring the stream for metadata changes. + The bot remains connected to IRC. + + Args: + message: IRC message object + """ self.logger.info(f"Stop command received from {message.sender}") await self.stop_monitoring() - if is_channel: - await recipient.message("Stream monitoring stopped.") + if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): + await message.recipient.message("Stream monitoring stopped.") + self.command_handlers['stop_monitoring'] = stop_monitoring @self.bot.on_message(create_command_pattern('start')) + @self.admin_required async def start_monitoring(message): - recipient = getattr(message, 'recipient', None) - is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') - - if not self.allow_private_commands and not is_channel: - return - if not self.is_admin(message.sender): - if is_channel: - await recipient.message("You don't have permission to use this command.") - return - + """!start: Start stream monitoring (admin only) + + Starts monitoring the stream for metadata changes. + Will announce new songs in the channel. + + Args: + message: IRC message object + """ self.logger.info(f"Start command received from {message.sender}") await self.start_monitoring() - if is_channel: - await recipient.message("Stream monitoring started.") + if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): + await message.recipient.message("Stream monitoring started.") + self.command_handlers['start_monitoring'] = start_monitoring + + def _build_command_mappings(self): + """Build bidirectional mappings between command patterns and method names. + + Creates two mappings: + - pattern_to_method: Maps IRC command pattern (e.g. 'np') to method name + - method_to_pattern: Maps method name to IRC command pattern + + Command patterns are extracted from the first line of each handler's docstring, + which must be in the format "!command: description". + """ + self.logger.debug("Building command mappings...") + + # Get all command handlers + for method_name, handler in self.command_handlers.items(): + if not handler.__doc__: + continue + + self.logger.debug(f"Checking handler: {method_name}") + + # Check if this is a command handler by looking at the docstring + if handler.__doc__.strip().startswith('!'): + # Extract command pattern from docstring + first_line = handler.__doc__.strip().split('\n')[0] + pattern = first_line.split(':', 1)[0].strip('!') + + self.logger.debug(f"Found command in docstring: {pattern} -> {method_name}") + + # Store both mappings + self.pattern_to_method[pattern] = method_name + self.method_to_pattern[method_name] = pattern + + # Check if this is an admin command by looking at the decorator + if hasattr(handler, '__closure__') and handler.__closure__: + for cell in handler.__closure__: + if isinstance(cell.cell_contents, type(self.admin_required)): + self.admin_commands.add(pattern) + self.logger.debug(f"Marked {pattern} as admin command") + break + + self.logger.debug(f"Mapped command pattern '{pattern}' to method '{method_name}'") + + # Log all mappings for debugging + self.logger.debug("Final command mappings:") + for pattern, method in self.pattern_to_method.items(): + self.logger.debug(f" {pattern} -> {method}") + self.logger.debug("Admin commands:") + for cmd in sorted(self.admin_commands): + self.logger.debug(f" {cmd}") def is_admin(self, user: str) -> bool: """Check if a user has admin privileges. Args: - user: Full IRC user string (nickname!username@hostname) or User object + user: Full IRC user string (nickname!username@hostname) or User object. + Returns: - bool: True if user has admin privileges + bool: True if user has admin privileges, False otherwise. """ try: if hasattr(user, 'name'): @@ -483,7 +864,11 @@ class IcecastBot: return '*' in self.admin_users or nickname in self.admin_users async def start_monitoring(self): - """Start the metadata monitoring task.""" + """Start the metadata monitoring task. + + Creates an asyncio task to monitor the stream for metadata changes. + Only starts if not already monitoring. + """ if not self.is_monitoring: self.should_monitor = True self.monitor_task = asyncio.create_task(self.monitor_metadata()) @@ -491,7 +876,11 @@ class IcecastBot: self.logger.info("Started metadata monitoring task") async def stop_monitoring(self): - """Stop the metadata monitoring task.""" + """Stop the metadata monitoring task. + + Terminates the curl subprocess if running and cancels the monitoring task. + Ensures proper cleanup of resources. + """ self.should_monitor = False self.is_monitoring = False @@ -523,7 +912,14 @@ class IcecastBot: self.logger.info("Stopped metadata monitoring task") async def restart_monitoring(self): - """Restart the metadata monitoring task and verify the reconnection.""" + """Restart the metadata monitoring task and verify the reconnection. + + Stops the current monitoring task, starts a new one, and verifies + that metadata can be fetched successfully. + + Returns: + bool: True if reconnection was successful and verified, False otherwise. + """ await self.stop_monitoring() # Store the channel for status updates @@ -556,7 +952,14 @@ class IcecastBot: return False async def fetch_json_metadata(self): - """Fetch metadata from the Icecast JSON status endpoint.""" + """Fetch metadata from the Icecast JSON status endpoint. + + Tries multiple URL patterns to find the correct status endpoint. + Handles connection errors and JSON parsing gracefully. + + Returns: + str: The current song title, or an error message if fetching failed. + """ try: # Try different URL patterns base_urls = [ @@ -602,6 +1005,12 @@ class IcecastBot: return "Error fetching metadata" async def monitor_metadata(self): + """Monitor the Icecast stream for metadata changes. + + Uses curl to read the stream and detect metadata changes. + Handles stream errors, reconnection, and health checks. + Announces new songs when detected. + """ await asyncio.sleep(5) while self.should_monitor: @@ -718,7 +1127,16 @@ class IcecastBot: await asyncio.sleep(5) async def announce_song(self, song: str): - """Announce song if it doesn't match any ignore patterns.""" + """Announce a song in the IRC channel. + + Args: + song: The song title to announce. + + Only announces if: + - The channel object is valid + - The song doesn't match any ignore patterns + - The announcement format is configured + """ try: if self.channel and self.should_announce_song(song): # Use the stored channel object directly @@ -731,15 +1149,142 @@ class IcecastBot: self.logger.error(f"Error announcing song: {e}", exc_info=True) async def start(self): - await self.bot.run() + """Start the IRC bot and begin processing events.""" + try: + # Start the restart manager + await self.restart_manager.start() + + # Start the bot + await self.bot.run() + finally: + if self.should_exit: + # Clean up any remaining tasks + if self.monitor_task: + self.monitor_task.cancel() + try: + await self.monitor_task + except asyncio.CancelledError: + pass + except Exception as e: + self.logger.error(f"Error during cleanup: {e}") + + # Clean up restart manager + self.restart_manager.cleanup() + + def format_help_section(self, section_config: dict, prefix: str) -> List[str]: + """Format a help section according to the template. + + Extracts the first line of each command's docstring to use as the + help text. Falls back to the template's command descriptions if + docstring is not available. + + Args: + section_config: Configuration dictionary for the section. + prefix: Command prefix to use. + + Returns: + List[str]: List of formatted help lines for each command. + """ + commands = [] + for cmd, desc in section_config['commands'].items(): + # Try to get the docstring from the command handler + handler = None + for name, method in inspect.getmembers(self, predicate=inspect.ismethod): + if name.endswith(f"_{cmd}"): # e.g. now_playing for np + handler = method + break + + if handler and handler.__doc__: + # Extract the first line of the docstring + first_line = handler.__doc__.strip().split('\n')[0] + # Remove the command prefix and colon + desc = first_line.split(':', 1)[1].strip() + + commands.append(section_config['format'].format( + cmd=f"\x02{prefix}{cmd}\x02", # Bold the command + desc=desc + )) + return commands + + async def help_command_fallback(self, message): + """Fallback help command implementation using hardcoded format. + + Used when the help template is not configured or fails to process. + + Args: + message: The IRC message object that triggered this command. + """ + try: + prefix = self.cmd_prefix + # Format commands with bold prefix and aligned descriptions + base_cmds = f"\x02{prefix}np\x02 (current song) • \x02{prefix}help\x02 (this help)" + help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | Commands: {base_cmds}" + + if self.is_admin(message.sender): + admin_cmds = ( + f"\x02{prefix}start\x02/\x02stop\x02 (monitoring) • " + f"\x02{prefix}reconnect\x02 (stream) • " + f"\x02{prefix}restart\x02 (bot) • " + f"\x02{prefix}quit\x02 (shutdown)" + ) + help_text += f" | Admin: {admin_cmds}" + + if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'): + await message.recipient.message(help_text) + self.logger.debug("Successfully sent fallback help reply") + except Exception as e: + self.logger.error(f"Error in help fallback handler: {e}", exc_info=True) + + def admin_required(self, f): + """Decorator to mark a command as requiring admin privileges. + + Also automatically adds the command to the admin_commands set + for help message grouping. + + Args: + f: The command handler function to wrap. + + Returns: + The wrapped function that checks for admin privileges. + """ + async def wrapped(message, *args, **kwargs): + recipient = getattr(message, 'recipient', None) + is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#') + + if not self.allow_private_commands and not is_channel: + return + + if not self.is_admin(message.sender): + if is_channel: + await recipient.message("You don't have permission to use this command.") + return + + return await f(message, *args, **kwargs) + + # Copy the docstring and other attributes + wrapped.__doc__ = f.__doc__ + wrapped.__name__ = f.__name__ + + # Add the command pattern to admin_commands set + if f.__doc__ and f.__doc__.strip().startswith('!'): + pattern = f.__doc__.strip().split(':', 1)[0].strip('!') + self.admin_commands.add(pattern) + + return wrapped async def run_multiple_bots(config_paths: List[str]): - """Run multiple bot instances concurrently.""" + """Run multiple bot instances concurrently. + + Each bot runs independently and can be stopped without affecting others. + """ bots = [] + tasks = [] for config_path in config_paths: try: bot = IcecastBot(config_path) bots.append(bot) + # Create task for each bot + tasks.append(asyncio.create_task(run_single_bot(bot))) except Exception as e: logging.error(f"Failed to initialize bot with config {config_path}: {e}") @@ -747,7 +1292,39 @@ async def run_multiple_bots(config_paths: List[str]): logging.error("No bots were successfully initialized") return - await asyncio.gather(*(bot.start() for bot in bots)) + # Wait for all bots to complete + await asyncio.gather(*tasks) + +async def run_single_bot(bot: IcecastBot): + """Run a single bot instance. + + Args: + bot: The IcecastBot instance to run + """ + try: + await bot.start() + + # Check if we should restart this bot + if bot.restart_manager.should_restart: + # Clean up + bot.restart_manager.cleanup() + # Create and start a new instance + new_bot = IcecastBot(bot.config_path) + await run_single_bot(new_bot) + # If should_exit is True but should_restart is False, just exit cleanly + elif bot.should_exit: + bot.logger.info("Bot shutting down...") + bot.restart_manager.cleanup() + except Exception as e: + bot.logger.error(f"Error running bot {bot.config['irc']['nick']}: {e}") + finally: + # Ensure cleanup happens + if bot.monitor_task: + bot.monitor_task.cancel() + try: + await bot.monitor_task + except asyncio.CancelledError: + pass if __name__ == "__main__": parser = argparse.ArgumentParser(description='Icecast IRC Bot') @@ -763,16 +1340,6 @@ if __name__ == "__main__": args = parser.parse_args() - # Check for restart flag files at startup - try: - for flag_file in Path('.').glob('.restart_flag_*'): - try: - flag_file.unlink() - except Exception as e: - logging.error(f"Error removing restart flag file {flag_file}: {e}") - except Exception as e: - logging.error(f"Error handling restart flag files: {e}") - # Set up the event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -804,7 +1371,7 @@ if __name__ == "__main__": bot.config['commands'] = {} bot.config['commands']['prefix'] = args.cmd_prefix - await bot.start() + await run_single_bot(bot) except Exception as e: logging.error(f"Unhandled exception: {e}") sys.exit(1) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..a919de9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,31 @@ +[project] +name = "icecast-irc-bot" +dynamic = ["version"] +description = "Icecast metadata IRC announcer bot" +authors = [ + {name = "cottongin", email = "cottongin@cottongin.xyz"}, +] +dependencies = [ + "asif", + "aiohttp", + "pyyaml", +] +requires-python = ">=3.11" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["."] + +[tool.hatch.build] +include = [ + "*.py", + "README*", + "LICENSE*", + "config.yaml.example" +] + +[tool.setuptools.dynamic] +version = {file = "VERSION"} \ No newline at end of file