1394 lines
57 KiB
Python

#!/usr/bin/env python3
# Set up logging configuration before imports
import logging
import os
class AsifFilter(logging.Filter):
def filter(self, record):
# Block messages from asif module or if they start with "Joined channel"
return not (record.module == 'asif' or
record.name.startswith('asif') or
(isinstance(record.msg, str) and record.msg.startswith('Joined channel')))
# ANSI color codes
class Colors:
COLORS = [
'\033[94m', # BLUE
'\033[96m', # CYAN
'\033[92m', # GREEN
'\033[93m', # YELLOW
'\033[95m', # MAGENTA
'\033[91m', # RED
]
ENDC = '\033[0m'
BOLD = '\033[1m'
# Additional colors for log levels
GREY = '\033[37m'
WHITE = '\033[97m'
RED = '\033[91m'
ORANGE = '\033[38;5;208m' # Using 256-color code for orange
CYAN = '\033[96m'
MAGENTA = '\033[95m'
# Track used colors
used_colors = {} # botname -> color mapping
@classmethod
def get_color_for_bot(cls, botname: str) -> str:
"""Get a consistent color for a bot based on its name."""
# If this bot already has a color, return it
if botname in cls.used_colors:
return cls.used_colors[botname]
# If we still have unused colors, use the next available one
unused_colors = [c for c in cls.COLORS if c not in cls.used_colors.values()]
if unused_colors:
color = unused_colors[0]
else:
# If we're out of unique colors, fall back to hash-based selection
color = cls.COLORS[hash(botname) % len(cls.COLORS)]
cls.used_colors[botname] = color
return color
class ColoredLevelFormatter(logging.Formatter):
"""Custom formatter that adds colors to log levels and ensures fixed width."""
# Color mapping for different log levels
LEVEL_COLORS = {
'DEBUG': Colors.CYAN,
'INFO': Colors.WHITE,
'WARNING': Colors.ORANGE,
'ERROR': Colors.RED,
'CRITICAL': Colors.MAGENTA
}
def format(self, record):
# Save original level name
original_level = record.levelname
# Pad level name to 8 characters (length of 'WARNING')
colored_level = f"{self.LEVEL_COLORS.get(original_level, Colors.WHITE)}{original_level:8}{Colors.ENDC}"
record.levelname = colored_level
result = super().format(record)
# Restore original level name
record.levelname = original_level
return result
# Defer logging setup until after config is loaded
def setup_logging(name: str, config: dict) -> logging.Logger:
"""Set up logging with the specified configuration.
Args:
name: The name for this logger instance
config: The logging configuration dictionary containing:
- level: The log level to use
- format: The log format template
- datefmt: The date format template
Returns:
logging.Logger: The configured logger instance
"""
# Get logging configuration with defaults
log_config = config.get('logging', {})
log_level = log_config.get('level', 'INFO')
log_format = log_config.get('format', '%(asctime)s - %(levelname)s - %(message)s')
date_format = log_config.get('datefmt', '%H:%M:%S')
# Convert string level to logging constant
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
# Create a logger with the given name
logger = logging.getLogger(name)
logger.setLevel(numeric_level)
# Create console handler with colored formatter
console_handler = logging.StreamHandler()
console_handler.setFormatter(
ColoredLevelFormatter(log_format, datefmt=date_format)
)
# Add AsifFilter to the logger
logger.addFilter(AsifFilter())
# Add the console handler to logger
logger.addHandler(console_handler)
# Prevent propagation to root logger
logger.propagate = False
return logger
# Set up error logging for asif
def create_error_handler(bot_name: str, config: dict) -> logging.FileHandler:
"""Create an error handler for the given bot instance.
Args:
bot_name: The name of the bot instance
config: The logging configuration dictionary
"""
handler = logging.FileHandler(f'ERROR_{bot_name}.log')
# Get error log format from config or use default
log_config = config.get('logging', {})
error_format = log_config.get('error_format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
date_format = log_config.get('datefmt', '%H:%M:%S')
# Use the colored formatter for error logs too
handler.setFormatter(ColoredLevelFormatter(error_format, datefmt=date_format))
return handler
from asif import Client
# Patch asif's Client class
def patch_client_bg(self, coro):
"""Patch for Client._bg to properly handle SystemExit"""
async def runner():
try:
await coro
except SystemExit:
raise # Re-raise SystemExit
except Exception:
self._log.exception("async: Coroutine raised exception")
return asyncio.ensure_future(runner())
# Patch asif's Client class to use error logger and patched _bg
def silent_client_init(self, *args, bot_name: str = None, config: dict = None, **kwargs):
# Create a logger that writes to ERROR.log
error_logger = logging.getLogger(f'asif.client.{bot_name}' if bot_name else 'asif.client')
error_handler = create_error_handler(bot_name, config) if bot_name else logging.FileHandler('ERROR.log')
error_logger.addHandler(error_handler)
error_logger.propagate = False # Don't send to console
error_logger.setLevel(logging.INFO) # Capture all messages
# Store the logger
self._log = error_logger
# Patch the _bg method
self._bg = patch_client_bg.__get__(self)
# Call the original __init__
original_init(self, *args, **kwargs)
# Save original __init__ and replace it
original_init = Client.__init__
Client.__init__ = silent_client_init
import asyncio
import re
import aiohttp
import json
import time
import argparse
import yaml
from pathlib import Path
from typing import List, Optional
import sys
import inspect
import os
import socket
import tempfile
class BotLoggerAdapter(logging.LoggerAdapter):
# Class variables to track maximum lengths
max_nick_length = 0
max_endpoint_length = 0
instances = [] # Keep track of all instances to update padding
def __init__(self, logger, extra):
super().__init__(logger, extra)
botname = extra['botname']
nick, endpoint = botname.split('@')
self.nick = nick
self.endpoint = endpoint
# Update max lengths (without ANSI codes)
old_max_nick = BotLoggerAdapter.max_nick_length
old_max_endpoint = BotLoggerAdapter.max_endpoint_length
BotLoggerAdapter.max_nick_length = max(
BotLoggerAdapter.max_nick_length,
len(nick)
)
BotLoggerAdapter.max_endpoint_length = max(
BotLoggerAdapter.max_endpoint_length,
len(endpoint)
)
# If max lengths changed, update all existing instances
if (old_max_nick != BotLoggerAdapter.max_nick_length or
old_max_endpoint != BotLoggerAdapter.max_endpoint_length):
for instance in BotLoggerAdapter.instances:
instance.update_padding()
# Add self to instances list
BotLoggerAdapter.instances.append(self)
# Initial padding calculation
self.update_padding()
def update_padding(self):
"""Update the colored botname with current padding requirements."""
# Right-align nick, then @ symbol, then colored endpoint
nick_padding = " " * (BotLoggerAdapter.max_nick_length - len(self.nick))
endpoint_padding = " " * (BotLoggerAdapter.max_endpoint_length - len(self.endpoint))
self.colored_botname = f"{nick_padding}{self.nick}@{Colors.BOLD}{Colors.get_color_for_bot(self.nick+'@'+self.endpoint)}{self.endpoint}{Colors.ENDC}{endpoint_padding}"
def process(self, msg, kwargs):
return f'[{self.colored_botname}] {msg}', kwargs
class RestartManager:
"""Manages bot restarts using Unix Domain Sockets.
This class provides a clean way to handle bot restarts without using
flag files. Each bot instance gets its own Unix Domain Socket.
"""
def __init__(self, bot_id: str):
"""Initialize the restart manager.
Args:
bot_id: Unique identifier for this bot instance
"""
self.bot_id = bot_id
self.socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
self.server = None
self.should_restart = False
async def start(self):
"""Start the restart manager server."""
# Clean up any existing socket
if self.socket_path.exists():
self.socket_path.unlink()
# Create the Unix Domain Socket server
self.server = await asyncio.start_unix_server(
self._handle_restart_request,
str(self.socket_path)
)
async def _handle_restart_request(self, reader, writer):
"""Handle an incoming restart request."""
try:
data = await reader.read()
if data == b'restart':
self.should_restart = True
writer.close()
await writer.wait_closed()
except Exception as e:
logging.error(f"Error handling restart request: {e}")
def cleanup(self):
"""Clean up the socket file."""
try:
if self.server:
self.server.close()
if self.socket_path.exists():
self.socket_path.unlink()
except Exception as e:
logging.error(f"Error cleaning up restart manager: {e}")
@staticmethod
async def signal_restart(bot_id: str):
"""Signal a specific bot to restart.
Args:
bot_id: ID of the bot to restart
"""
socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
try:
reader, writer = await asyncio.open_unix_connection(str(socket_path))
writer.write(b'restart')
await writer.drain()
writer.close()
await writer.wait_closed()
except Exception as e:
logging.error(f"Error signaling restart to bot {bot_id}: {e}")
class IcecastBot:
def __init__(self, config_path: Optional[str] = None):
# Store config path for potential restarts
self.config_path = config_path
# Load config
self.config = self.load_config(config_path)
...
class IcecastBot:
"""An IRC bot that monitors an Icecast stream and announces track changes.
This bot connects to an IRC server and channel, monitors a specified Icecast stream
for metadata changes, and announces new tracks. It supports various commands for
both regular users and administrators.
Features:
- Configurable via YAML files
- Multiple URL patterns for metadata fetching
- Automatic reconnection and error recovery
- Admin commands with permission system
- Customizable help messages and announcements
"""
def get_version():
"""Get the current version from VERSION file."""
with open('VERSION') as f:
return f.read().strip()
VERSION = get_version()
def __init__(self, config_path: Optional[str] = None):
"""Initialize the bot with the given configuration.
Args:
config_path: Path to the YAML configuration file. If None, uses default path.
"""
# Store config path for potential restarts
self.config_path = config_path
# Load config
self.config = self.load_config(config_path)
# Create unique bot ID from nick and endpoint
self.bot_id = f"{self.config['irc']['nick']}_{self.config['stream']['endpoint']}"
# Initialize restart manager
self.restart_manager = RestartManager(self.bot_id)
# Set up bot-specific logger
bot_name = f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}'
base_logger = setup_logging(
f'icecast_bot.{bot_name}',
self.config
)
self.logger = BotLoggerAdapter(
base_logger,
{'botname': bot_name}
)
self.logger.info(f"Starting Icecast IRC Bot v{self.VERSION}")
# Initialize IRC bot with config and bot name for logging
self.bot = Client(
host=self.config['irc']['host'],
port=self.config['irc']['port'],
user=self.config['irc']['user'],
realname=self.config['irc']['realname'],
nick=self.config['irc']['nick'],
bot_name=bot_name,
config=self.config
)
# Set up instance variables
self.channel_name = self.config['irc']['channel']
self.stream_url = self.config['stream']['url']
self.stream_endpoint = self.config['stream']['endpoint']
self.current_song = "Unknown"
self.reply = self.config['announce']['format']
self.ignore_patterns = self.config['announce']['ignore_patterns']
self.channel = None
self.last_health_check = time.time()
self.health_check_interval = self.config['stream']['health_check_interval']
# Get command settings from config
self.cmd_prefix = self.config.get('commands', {}).get('prefix', '!')
self.require_nick_prefix = self.config.get('commands', {}).get('require_nick_prefix', False)
self.allow_private_commands = self.config.get('commands', {}).get('allow_private_commands', False)
# Get help format templates
help_config = self.config.get('help', {})
self.help_specific_format = help_config.get('specific_format', "\x02{prefix}{cmd}\x02: {desc}")
self.help_list_format = help_config.get('list_format', "(\x02{cmd}\x02, {desc})")
self.help_list_separator = help_config.get('list_separator', " | ")
# Control flags
self.monitor_task = None
self.should_monitor = True
self.is_monitoring = False
self.admin_users = self.config.get('admin', {}).get('users', ['*'])
self.current_process = None
self.should_exit = False
# Initialize command mappings
self.pattern_to_method = {}
self.method_to_pattern = {}
self.admin_commands = set()
self.command_handlers = {}
# Set up handlers
self.setup_handlers()
# Build command mappings
self._build_command_mappings()
@staticmethod
def load_config(config_path: Optional[str] = None) -> dict:
"""Load and validate the bot configuration from a YAML file.
Args:
config_path: Path to the YAML configuration file. If None, uses default path.
Returns:
dict: The loaded and validated configuration dictionary with default values applied.
"""
if config_path is None:
config_path = Path(__file__).parent / 'config.yaml'
# Load config file
try:
with open(config_path) as f:
config = yaml.safe_load(f)
except FileNotFoundError:
config = {
'irc': {},
'stream': {},
'announce': {
'format': "\x02Now playing:\x02 {song}",
'ignore_patterns': ['Unknown', 'Unable to fetch metadata']
},
'logging': {
'level': 'INFO',
'format': '%(asctime)s - %(levelname)s - %(message)s',
'error_format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
'datefmt': '%H:%M:%S'
}
}
# Ensure logging config exists with defaults
if 'logging' not in config:
config['logging'] = {}
if 'format' not in config['logging']:
config['logging']['format'] = '%(asctime)s - %(levelname)s - %(message)s'
if 'error_format' not in config['logging']:
config['logging']['error_format'] = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
if 'datefmt' not in config['logging']:
config['logging']['datefmt'] = '%H:%M:%S'
if 'level' not in config['logging']:
config['logging']['level'] = 'INFO'
return config
def should_announce_song(self, song: str) -> bool:
"""Check if a song should be announced based on configured ignore patterns.
Args:
song: The song title to check.
Returns:
bool: True if the song should be announced, False if it matches any ignore patterns.
"""
return not any(pattern.lower() in song.lower() for pattern in self.ignore_patterns)
def setup_handlers(self):
"""Set up all IRC event handlers and command patterns.
This method configures:
- Connection and join handlers
- Command pattern creation
- Message debugging
- Command handlers (np, help, admin commands)
"""
@self.bot.on_connected()
async def connected():
try:
self.channel = await self.bot.join(self.channel_name)
self.logger.info(f"Connected to IRC and joined {self.channel_name}")
except Exception as e:
self.logger.error(f"Error joining channel: {e}")
if self.should_monitor:
await self.start_monitoring()
@self.bot.on_join()
async def on_join(channel):
# Silently store the channel without logging
if not self.channel:
self.channel = channel
def create_command_pattern(cmd: str) -> re.Pattern:
"""Create a regex pattern for matching IRC commands.
The pattern handles optional nick prefixes (if configured) and
command prefixes.
Args:
cmd: The command name without prefix.
Returns:
re.Pattern: Compiled regex pattern for matching the command.
"""
if self.require_nick_prefix:
pattern = f"^({self.config['irc']['nick']}[:,] )?{re.escape(self.cmd_prefix)}{cmd}($| .*$)"
else:
pattern = f"^{re.escape(self.cmd_prefix)}{cmd}($| .*$)"
self.logger.debug(f"Created command pattern for '{cmd}': {pattern}")
return re.compile(pattern)
# Global message handler for debugging
@self.bot.on_message()
async def debug_message(message):
"""Debug handler for logging all received messages.
Args:
message: The IRC message object to log.
"""
try:
# Log full message details only at DEBUG level
self.logger.debug(
f"Received message: "
f"recipient={getattr(message, 'recipient', None)!r}, "
f"sender={getattr(message, 'sender', None)!r}, "
f"text={getattr(message, 'text', None)!r}"
)
# Test each command pattern
msg = getattr(message, 'text', None)
if msg and isinstance(msg, str):
for cmd in ['np', 'help', 'restart', 'quit', 'reconnect', 'stop', 'start']:
pattern = create_command_pattern(cmd)
if pattern.match(msg):
self.logger.debug(f"Command matched: {cmd}")
except Exception as e:
self.logger.error(f"Error in debug message handler: {e}")
# Command handlers with additional debug logging
@self.bot.on_message(create_command_pattern('np'))
async def now_playing(message):
"""!np: Show the currently playing song
Displays the current song title from the Icecast stream.
The song title is fetched from the stream's metadata or JSON status.
Args:
message: IRC message object
"""
self.logger.debug(f"np handler called with message: {getattr(message, 'text', None)!r}")
recipient = getattr(message, 'recipient', None)
# Check if recipient is a Channel object
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
self.logger.debug(f"Recipient check - is_channel: {is_channel}, recipient: {recipient}")
if not self.allow_private_commands and not is_channel:
self.logger.debug("Ignoring private np command")
return
try:
if is_channel:
await recipient.message(self.reply.format(song=self.current_song))
self.logger.debug("Successfully sent np reply")
else:
self.logger.debug("Could not send np reply - invalid recipient")
except Exception as e:
self.logger.error(f"Error in np handler: {e}", exc_info=True)
self.command_handlers['now_playing'] = now_playing
@self.bot.on_message(create_command_pattern('help'))
async def help_command(message):
"""!help: Show available commands
Display all available commands or detailed help for a specific command.
Usage: !help [command]
Args:
message: IRC message object
"""
self.logger.debug(f"help handler called with message: {getattr(message, 'text', None)!r}")
recipient = getattr(message, 'recipient', None)
# Check if recipient is a Channel object
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
self.logger.debug(f"Recipient check - is_channel: {is_channel}, recipient: {recipient}")
if not self.allow_private_commands and not is_channel:
self.logger.debug("Ignoring private help command")
return
try:
# Parse message to check if a specific command was requested
msg_text = getattr(message, 'text', '')
parts = msg_text.strip().split()
self.logger.debug(f"Available command patterns: {list(self.pattern_to_method.keys())}")
self.logger.debug(f"Admin commands: {list(self.admin_commands)}")
if len(parts) > 1:
# Specific command help requested
pattern = parts[1].lower()
if pattern.startswith(self.cmd_prefix):
pattern = pattern[len(self.cmd_prefix):] # Remove prefix if included
self.logger.debug(f"Looking up help for command pattern: {pattern}")
# Find the command handler using our mapping
method_name = self.pattern_to_method.get(pattern)
self.logger.debug(f"Found method name: {method_name}")
if method_name:
handler = self.command_handlers.get(method_name)
if handler and handler.__doc__:
# Get the first line of the docstring
first_line = handler.__doc__.strip().split('\n')[0]
# Format it using the template and add (admin only) if needed
desc = first_line.split(':', 1)[1].strip()
if pattern in self.admin_commands:
desc = f"{desc} (admin only)"
help_text = self.help_specific_format.format(
prefix=self.cmd_prefix,
cmd=pattern,
desc=desc
)
# Check if user has permission for this command
if pattern in self.admin_commands and not self.is_admin(message.sender):
help_text = "You don't have permission to use this command."
else:
help_text = f"No help available for command: {pattern}"
else:
help_text = f"Unknown command: {pattern}"
else:
# Build command list with proper formatting
formatted_groups = []
# Add general commands first
general_commands = []
for pattern, method_name in self.pattern_to_method.items():
if pattern not in self.admin_commands: # If not an admin command
handler = self.command_handlers.get(method_name)
if handler and handler.__doc__:
first_line = handler.__doc__.strip().split('\n')[0]
desc = first_line.split(':', 1)[1].strip()
general_commands.append(
self.help_list_format.format(
cmd=f"{self.cmd_prefix}{pattern}",
desc=desc
)
)
if general_commands:
formatted_groups.append(self.help_list_separator.join(general_commands))
# Add admin commands if user is admin
if self.is_admin(message.sender):
admin_commands = []
for pattern in sorted(self.admin_commands):
method_name = self.pattern_to_method.get(pattern)
if method_name:
handler = self.command_handlers.get(method_name)
if handler and handler.__doc__:
first_line = handler.__doc__.strip().split('\n')[0]
desc = first_line.split(':', 1)[1].strip()
# Don't add (admin only) in the list view
admin_commands.append(
self.help_list_format.format(
cmd=f"{self.cmd_prefix}{pattern}",
desc=desc
)
)
if admin_commands:
formatted_groups.append("Admin: " + self.help_list_separator.join(admin_commands))
help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | " + " | ".join(formatted_groups)
if is_channel:
await recipient.message(help_text)
self.logger.debug("Successfully sent help reply")
else:
self.logger.debug("Could not send help reply - invalid recipient")
except Exception as e:
self.logger.error(f"Error in help handler: {e}", exc_info=True)
self.command_handlers['help_command'] = help_command
@self.bot.on_message(create_command_pattern('restart'))
@self.admin_required
async def restart_bot(message):
"""!restart: Restart the bot (admin only)
Gracefully shuts down the bot and signals the bot.sh script
to restart it. This ensures a clean restart.
Args:
message: IRC message object
"""
self.logger.info(f"Restart command received from {message.sender}")
await self.stop_monitoring()
try:
await self.bot.quit("Restarting...")
# Signal restart through Unix Domain Socket
await RestartManager.signal_restart(self.bot_id)
self.should_exit = True
except Exception as e:
self.logger.error(f"Error during restart: {e}")
self.command_handlers['restart_bot'] = restart_bot
@self.bot.on_message(create_command_pattern('quit'))
@self.admin_required
async def quit_bot(message):
"""!quit: Shutdown the bot (admin only)
Gracefully shuts down the bot and exits without restarting.
Args:
message: IRC message object
"""
self.logger.info(f"Quit command received from {message.sender}")
await self.stop_monitoring()
try:
await self.bot.quit("Shutting down...")
self.should_exit = True
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
self.command_handlers['quit_bot'] = quit_bot
@self.bot.on_message(create_command_pattern('reconnect'))
@self.admin_required
async def reconnect_stream(message):
"""!reconnect: Reconnect to the stream (admin only)
Attempts to reconnect to the stream and verifies the connection.
Reports success or failure back to the channel.
Args:
message: IRC message object
"""
self.logger.info(f"Reconnect command received from {message.sender}")
success = await self.restart_monitoring()
if not success and hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
await message.recipient.message("Stream reconnection may have failed. Check logs for details.")
self.command_handlers['reconnect_stream'] = reconnect_stream
@self.bot.on_message(create_command_pattern('stop'))
@self.admin_required
async def stop_monitoring(message):
"""!stop: Stop stream monitoring (admin only)
Stops monitoring the stream for metadata changes.
The bot remains connected to IRC.
Args:
message: IRC message object
"""
self.logger.info(f"Stop command received from {message.sender}")
await self.stop_monitoring()
if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
await message.recipient.message("Stream monitoring stopped.")
self.command_handlers['stop_monitoring'] = stop_monitoring
@self.bot.on_message(create_command_pattern('start'))
@self.admin_required
async def start_monitoring(message):
"""!start: Start stream monitoring (admin only)
Starts monitoring the stream for metadata changes.
Will announce new songs in the channel.
Args:
message: IRC message object
"""
self.logger.info(f"Start command received from {message.sender}")
await self.start_monitoring()
if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
await message.recipient.message("Stream monitoring started.")
self.command_handlers['start_monitoring'] = start_monitoring
def _build_command_mappings(self):
"""Build bidirectional mappings between command patterns and method names.
Creates two mappings:
- pattern_to_method: Maps IRC command pattern (e.g. 'np') to method name
- method_to_pattern: Maps method name to IRC command pattern
Command patterns are extracted from the first line of each handler's docstring,
which must be in the format "!command: description".
"""
self.logger.debug("Building command mappings...")
# Get all command handlers
for method_name, handler in self.command_handlers.items():
if not handler.__doc__:
continue
self.logger.debug(f"Checking handler: {method_name}")
# Check if this is a command handler by looking at the docstring
if handler.__doc__.strip().startswith('!'):
# Extract command pattern from docstring
first_line = handler.__doc__.strip().split('\n')[0]
pattern = first_line.split(':', 1)[0].strip('!')
self.logger.debug(f"Found command in docstring: {pattern} -> {method_name}")
# Store both mappings
self.pattern_to_method[pattern] = method_name
self.method_to_pattern[method_name] = pattern
# Check if this is an admin command by looking at the decorator
if hasattr(handler, '__closure__') and handler.__closure__:
for cell in handler.__closure__:
if isinstance(cell.cell_contents, type(self.admin_required)):
self.admin_commands.add(pattern)
self.logger.debug(f"Marked {pattern} as admin command")
break
self.logger.debug(f"Mapped command pattern '{pattern}' to method '{method_name}'")
# Log all mappings for debugging
self.logger.debug("Final command mappings:")
for pattern, method in self.pattern_to_method.items():
self.logger.debug(f" {pattern} -> {method}")
self.logger.debug("Admin commands:")
for cmd in sorted(self.admin_commands):
self.logger.debug(f" {cmd}")
def is_admin(self, user: str) -> bool:
"""Check if a user has admin privileges.
Args:
user: Full IRC user string (nickname!username@hostname) or User object.
Returns:
bool: True if user has admin privileges, False otherwise.
"""
try:
if hasattr(user, 'name'):
nickname = user.name
else:
nickname = user.split('!')[0] if '!' in user else user
except Exception as e:
self.logger.error(f"Error extracting nickname: {e}")
return False
return '*' in self.admin_users or nickname in self.admin_users
async def start_monitoring(self):
"""Start the metadata monitoring task.
Creates an asyncio task to monitor the stream for metadata changes.
Only starts if not already monitoring.
"""
if not self.is_monitoring:
self.should_monitor = True
self.monitor_task = asyncio.create_task(self.monitor_metadata())
self.is_monitoring = True
self.logger.info("Started metadata monitoring task")
async def stop_monitoring(self):
"""Stop the metadata monitoring task.
Terminates the curl subprocess if running and cancels the monitoring task.
Ensures proper cleanup of resources.
"""
self.should_monitor = False
self.is_monitoring = False
# First, terminate the subprocess if it exists
if self.current_process and self.current_process.returncode is None:
try:
self.current_process.terminate()
try:
await asyncio.wait_for(self.current_process.wait(), timeout=2.0)
except asyncio.TimeoutError:
self.current_process.kill()
await self.current_process.wait()
except Exception as e:
self.logger.error(f"Error terminating subprocess: {e}")
finally:
self.current_process = None
# Then cancel the monitoring task
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(f"Error cancelling monitor task: {e}")
finally:
self.monitor_task = None
self.logger.info("Stopped metadata monitoring task")
async def restart_monitoring(self):
"""Restart the metadata monitoring task and verify the reconnection.
Stops the current monitoring task, starts a new one, and verifies
that metadata can be fetched successfully.
Returns:
bool: True if reconnection was successful and verified, False otherwise.
"""
await self.stop_monitoring()
# Store the channel for status updates
notify_channel = self.channel
try:
await self.start_monitoring()
# Wait for up to 10 seconds to confirm data is being received
start_time = time.time()
while time.time() - start_time < 10:
if self.current_process and self.current_process.returncode is None:
# Try to fetch metadata to confirm connection
test_song = await self.fetch_json_metadata()
if test_song and "Unable to fetch metadata" not in test_song:
if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'):
await notify_channel.message("Stream reconnected successfully.")
return True
await asyncio.sleep(1)
# If we get here, the connection wasn't confirmed
if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'):
await notify_channel.message("Stream reconnection attempt completed, but status uncertain. Check logs for details.")
return False
except Exception as e:
self.logger.error(f"Error during stream reconnection: {e}")
if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'):
await notify_channel.message(f"Failed to reconnect to stream: {str(e)}")
return False
async def fetch_json_metadata(self):
"""Fetch metadata from the Icecast JSON status endpoint.
Tries multiple URL patterns to find the correct status endpoint.
Handles connection errors and JSON parsing gracefully.
Returns:
str: The current song title, or an error message if fetching failed.
"""
try:
# Try different URL patterns
base_urls = [
self.stream_url, # Original URL
self.stream_url.replace('live.', 'listen.'), # Try listen. instead of live.
'/'.join(self.stream_url.split('/')[:-1]) if '/' in self.stream_url else self.stream_url # Try parent path
]
for base_url in base_urls:
try:
url = f"{base_url}/status-json.xsl"
self.logger.debug(f"Attempting to fetch metadata from: {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.text()
json_data = json.loads(data)
if 'icestats' in json_data:
sources = json_data['icestats'].get('source', [])
if not isinstance(sources, list):
sources = [sources]
# Find our stream
for src in sources:
if src.get('listenurl', '').endswith(self.stream_endpoint):
title = src.get('title') or src.get('song') or src.get('current_song')
if title:
return title
except aiohttp.ClientError as e:
self.logger.debug(f"Failed to fetch from {url}: {e}")
continue
except json.JSONDecodeError as e:
self.logger.debug(f"Failed to parse JSON from {url}: {e}")
continue
self.logger.warning("Unable to fetch metadata from any URL")
return "Unable to fetch metadata"
except Exception as e:
self.logger.error(f"Error fetching JSON metadata: {e}", exc_info=True)
return "Error fetching metadata"
async def monitor_metadata(self):
"""Monitor the Icecast stream for metadata changes.
Uses curl to read the stream and detect metadata changes.
Handles stream errors, reconnection, and health checks.
Announces new songs when detected.
"""
await asyncio.sleep(5)
while self.should_monitor:
try:
cmd = [
'curl',
'-s',
'-H', 'Icy-MetaData: 1',
'--no-buffer',
f"{self.stream_url}/{self.stream_endpoint}"
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.current_process = process # Store the process reference
self.logger.info("Started stream monitoring")
last_data_received = time.time()
buffer = b""
last_json_check = time.time()
json_check_interval = 60 # Fallback interval if ICY updates fail
data_timeout = 180 # 3 minutes without data is considered a failure
empty_chunks_count = 0
max_empty_chunks = 5 # After 5 empty chunks in a row, reconnect
while True:
try:
chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
if chunk:
last_data_received = time.time()
buffer += chunk
empty_chunks_count = 0 # Reset counter on successful data
else:
empty_chunks_count += 1
self.logger.warning(f"Received empty chunk from stream ({empty_chunks_count}/{max_empty_chunks})")
if empty_chunks_count >= max_empty_chunks:
self.logger.error("Too many empty chunks in a row, restarting connection")
break
if time.time() - last_data_received > data_timeout:
self.logger.error(f"No data received for {data_timeout} seconds, restarting connection")
break
current_time = time.time()
# Periodic health check
if current_time - self.last_health_check >= self.health_check_interval:
self.logger.info(
f"Monitor status: Active - Last data received {int(current_time - last_data_received)}s ago"
)
self.last_health_check = current_time
# Look for metadata marker but fetch from JSON
if b"StreamTitle='" in buffer:
new_song = await self.fetch_json_metadata()
if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song:
self.logger.info(f"Now Playing: {new_song}")
self.current_song = new_song
await self.announce_song(new_song)
# Clear buffer after metadata marker
buffer = buffer[buffer.find(b"';", buffer.find(b"StreamTitle='")) + 2:]
last_json_check = current_time
# Keep buffer size reasonable
if len(buffer) > 65536:
buffer = buffer[-32768:]
# Fallback JSON check if ICY updates aren't coming through
if current_time - last_json_check >= json_check_interval:
new_song = await self.fetch_json_metadata()
if "Unable to fetch metadata" in new_song:
if time.time() - last_data_received > data_timeout:
break
if new_song and new_song != self.current_song:
self.logger.info(f"Now Playing (fallback): {new_song}")
self.current_song = new_song
await self.announce_song(new_song)
last_json_check = current_time
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
self.logger.warning("Timeout while reading stream data")
if time.time() - last_data_received > data_timeout:
self.logger.error("Stream read timeout exceeded limit, restarting connection")
break
continue
except Exception as e:
self.logger.error(f"Error in stream processing loop: {e}")
break
# Check if process is still running and terminate if needed
if process.returncode is None:
try:
process.terminate()
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
self.logger.warning("Process did not terminate gracefully, killing it")
process.kill()
await process.wait()
except Exception as e:
self.logger.error(f"Error terminating process: {e}")
finally:
self.current_process = None
self.logger.warning("Stream monitor ended, restarting in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
self.logger.error(f"Stream monitor error: {e}")
await asyncio.sleep(5)
async def announce_song(self, song: str):
"""Announce a song in the IRC channel.
Args:
song: The song title to announce.
Only announces if:
- The channel object is valid
- The song doesn't match any ignore patterns
- The announcement format is configured
"""
try:
if self.channel and self.should_announce_song(song):
# Use the stored channel object directly
if hasattr(self.channel, 'name') and self.channel.name.startswith('#'):
await self.channel.message(self.reply.format(song=song))
self.logger.debug(f"Successfully announced song: {song}")
else:
self.logger.error(f"Could not announce song - invalid channel object: {self.channel}")
except Exception as e:
self.logger.error(f"Error announcing song: {e}", exc_info=True)
async def start(self):
"""Start the IRC bot and begin processing events."""
try:
# Start the restart manager
await self.restart_manager.start()
# Start the bot
await self.bot.run()
finally:
if self.should_exit:
# Clean up any remaining tasks
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")
# Clean up restart manager
self.restart_manager.cleanup()
def format_help_section(self, section_config: dict, prefix: str) -> List[str]:
"""Format a help section according to the template.
Extracts the first line of each command's docstring to use as the
help text. Falls back to the template's command descriptions if
docstring is not available.
Args:
section_config: Configuration dictionary for the section.
prefix: Command prefix to use.
Returns:
List[str]: List of formatted help lines for each command.
"""
commands = []
for cmd, desc in section_config['commands'].items():
# Try to get the docstring from the command handler
handler = None
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
if name.endswith(f"_{cmd}"): # e.g. now_playing for np
handler = method
break
if handler and handler.__doc__:
# Extract the first line of the docstring
first_line = handler.__doc__.strip().split('\n')[0]
# Remove the command prefix and colon
desc = first_line.split(':', 1)[1].strip()
commands.append(section_config['format'].format(
cmd=f"\x02{prefix}{cmd}\x02", # Bold the command
desc=desc
))
return commands
async def help_command_fallback(self, message):
"""Fallback help command implementation using hardcoded format.
Used when the help template is not configured or fails to process.
Args:
message: The IRC message object that triggered this command.
"""
try:
prefix = self.cmd_prefix
# Format commands with bold prefix and aligned descriptions
base_cmds = f"\x02{prefix}np\x02 (current song) • \x02{prefix}help\x02 (this help)"
help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | Commands: {base_cmds}"
if self.is_admin(message.sender):
admin_cmds = (
f"\x02{prefix}start\x02/\x02stop\x02 (monitoring) • "
f"\x02{prefix}reconnect\x02 (stream) • "
f"\x02{prefix}restart\x02 (bot) • "
f"\x02{prefix}quit\x02 (shutdown)"
)
help_text += f" | Admin: {admin_cmds}"
if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
await message.recipient.message(help_text)
self.logger.debug("Successfully sent fallback help reply")
except Exception as e:
self.logger.error(f"Error in help fallback handler: {e}", exc_info=True)
def admin_required(self, f):
"""Decorator to mark a command as requiring admin privileges.
Also automatically adds the command to the admin_commands set
for help message grouping.
Args:
f: The command handler function to wrap.
Returns:
The wrapped function that checks for admin privileges.
"""
async def wrapped(message, *args, **kwargs):
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
if not self.is_admin(message.sender):
if is_channel:
await recipient.message("You don't have permission to use this command.")
return
return await f(message, *args, **kwargs)
# Copy the docstring and other attributes
wrapped.__doc__ = f.__doc__
wrapped.__name__ = f.__name__
# Add the command pattern to admin_commands set
if f.__doc__ and f.__doc__.strip().startswith('!'):
pattern = f.__doc__.strip().split(':', 1)[0].strip('!')
self.admin_commands.add(pattern)
return wrapped
async def run_multiple_bots(config_paths: List[str]):
"""Run multiple bot instances concurrently.
Each bot runs independently and can be stopped without affecting others.
"""
bots = []
tasks = []
for config_path in config_paths:
try:
bot = IcecastBot(config_path)
bots.append(bot)
# Create task for each bot
tasks.append(asyncio.create_task(run_single_bot(bot)))
except Exception as e:
logging.error(f"Failed to initialize bot with config {config_path}: {e}")
if not bots:
logging.error("No bots were successfully initialized")
return
# Wait for all bots to complete
await asyncio.gather(*tasks)
async def run_single_bot(bot: IcecastBot):
"""Run a single bot instance.
Args:
bot: The IcecastBot instance to run
"""
try:
await bot.start()
# Check if we should restart this bot
if bot.restart_manager.should_restart:
# Clean up
bot.restart_manager.cleanup()
# Create and start a new instance
new_bot = IcecastBot(bot.config_path)
await run_single_bot(new_bot)
# If should_exit is True but should_restart is False, just exit cleanly
elif bot.should_exit:
bot.logger.info("Bot shutting down...")
bot.restart_manager.cleanup()
except Exception as e:
bot.logger.error(f"Error running bot {bot.config['irc']['nick']}: {e}")
finally:
# Ensure cleanup happens
if bot.monitor_task:
bot.monitor_task.cancel()
try:
await bot.monitor_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Icecast IRC Bot')
parser.add_argument('configs', nargs='*', help='Paths to config files')
parser.add_argument('--config', type=str, help='Path to single config file')
parser.add_argument('--irc-host', type=str, help='IRC server host')
parser.add_argument('--irc-port', type=int, help='IRC server port')
parser.add_argument('--irc-nick', type=str, help='IRC nickname')
parser.add_argument('--irc-channel', type=str, help='IRC channel')
parser.add_argument('--stream-url', type=str, help='Icecast stream URL (base url; do not include /stream or .mp3)')
parser.add_argument('--stream-endpoint', type=str, help='Stream endpoint (e.g. /stream)')
parser.add_argument('--cmd-prefix', type=str, help='Command prefix character(s)')
args = parser.parse_args()
# Set up the event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run_bot():
try:
if args.configs:
# Multi-bot mode
await run_multiple_bots(args.configs)
else:
# Single-bot mode
bot = IcecastBot(args.config)
# Apply any command line overrides to the config
if args.irc_host:
bot.config['irc']['host'] = args.irc_host
if args.irc_port:
bot.config['irc']['port'] = args.irc_port
if args.irc_nick:
bot.config['irc']['nick'] = args.irc_nick
if args.irc_channel:
bot.config['irc']['channel'] = args.irc_channel
if args.stream_url:
bot.config['stream']['url'] = args.stream_url
if args.stream_endpoint:
bot.config['stream']['endpoint'] = args.stream_endpoint
if args.cmd_prefix:
if 'commands' not in bot.config:
bot.config['commands'] = {}
bot.config['commands']['prefix'] = args.cmd_prefix
await run_single_bot(bot)
except Exception as e:
logging.error(f"Unhandled exception: {e}")
sys.exit(1)
try:
# Run the bot
loop.run_until_complete(run_bot())
finally:
try:
# Cancel any remaining tasks
for task in asyncio.all_tasks(loop):
task.cancel()
# Give tasks a chance to cancel
loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True))
# Finally close the loop
loop.close()
except Exception as e:
logging.error(f"Error during cleanup: {e}")
sys.exit(1)