initial implementation of logging with an abstracted logger

This commit is contained in:
cottongin 2025-02-24 23:12:15 -08:00
parent 0face72fd4
commit 9c978c4773
Signed by: cottongin
GPG Key ID: A0BD18428A296890
3 changed files with 287 additions and 135 deletions

121
logger.py Normal file
View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Logger module for the Icecast metadata IRC announcer.
Uses Loguru for simple yet powerful logging.
"""
import os
import sys
from pathlib import Path
from loguru import logger
class LogManager:
"""
Manages logging configuration for the Icecast metadata IRC announcer.
This class provides a simple interface for configuring and using Loguru
throughout the project. It handles log file rotation, formatting, and
different log levels.
"""
def __init__(self, config=None):
"""
Initialize the LogManager with optional configuration.
Args:
config: Optional dictionary with logging configuration.
If None, default configuration is used.
"""
self.config = config or {}
self.log_dir = Path(self.config.get('log_dir', 'logs'))
self.log_level = self.config.get('level', 'INFO').upper()
self.log_format = self.config.get('format',
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
# Create log directory if it doesn't exist
if not self.log_dir.exists():
self.log_dir.mkdir(parents=True, exist_ok=True)
# Configure logger
self._configure_logger()
def _configure_logger(self):
"""Configure Loguru logger with appropriate sinks and formats."""
# Remove default handler
logger.remove()
# Add console handler
logger.add(
sys.stderr,
format=self.log_format,
level=self.log_level,
colorize=True
)
# Add file handler with rotation
log_file = self.log_dir / "icecast_bot.log"
logger.add(
str(log_file),
format=self.log_format,
level=self.log_level,
rotation="10 MB", # Rotate when file reaches 10MB
compression="zip", # Compress rotated logs
retention="1 week", # Keep logs for 1 week
backtrace=True, # Include backtrace in error logs
diagnose=True # Include variables in error logs
)
def get_logger(self, name=None):
"""
Get a logger instance with the given name.
Args:
name: Optional name for the logger. If None, the calling module's name is used.
Returns:
A Loguru logger instance.
"""
if name:
return logger.bind(name=name)
return logger
@staticmethod
def set_level(level):
"""
Set the log level for all handlers.
Args:
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
for handler_id in logger._core.handlers:
logger.configure(handler_id, level=level)
# Create a default instance for easy import
log_manager = LogManager()
get_logger = log_manager.get_logger
# Export the logger directly for simple usage
log = logger
# For convenience, export common log levels
debug = logger.debug
info = logger.info
warning = logger.warning
error = logger.error
critical = logger.critical
exception = logger.exception
# Example usage:
# from logger import log, debug, info, error
# log.info("This is an info message")
# debug("This is a debug message")
#
# # Or with a named logger:
# from logger import get_logger
# logger = get_logger("my_module")
# logger.info("This is a message from my_module")

300
main.py
View File

@ -17,6 +17,9 @@ import socket
import tempfile
import signal
# Import our custom logger
from logger import log, debug, info, warning, error, critical, exception, get_logger
# ANSI color codes for backward compatibility if needed
class Colors:
BLUE = "\033[94m"
@ -69,10 +72,9 @@ import socket
import tempfile
class RestartManager:
"""Manages bot restarts using Unix Domain Sockets.
"""Manages restart requests for the bot.
This class provides a clean way to handle bot restarts without using
flag files. Each bot instance gets its own Unix Domain Socket.
Uses a Unix domain socket to listen for restart commands.
"""
def __init__(self, bot_id: str):
@ -81,6 +83,9 @@ class RestartManager:
Args:
bot_id: Unique identifier for this bot instance
"""
# Get a logger for this class
self.logger = get_logger("RestartManager")
self.bot_id = bot_id
self.socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
self.server = None
@ -101,9 +106,9 @@ class RestartManager:
self._handle_restart_request,
str(self.socket_path)
)
print(f"Restart manager server started at {self.socket_path}")
self.logger.info(f"Restart manager server started at {self.socket_path}")
except Exception as e:
print(f"Error starting restart manager server: {e}")
self.logger.error(f"Error starting restart manager server: {e}")
# Continue without the restart manager
pass
@ -112,22 +117,26 @@ class RestartManager:
try:
data = await reader.read()
if data == b'restart':
self.logger.info("Received restart request")
self.should_restart = True
writer.close()
await writer.wait_closed()
except Exception as e:
pass
self.logger.error(f"Error handling restart request: {e}")
def cleanup(self):
"""Clean up the socket file."""
try:
if self.server:
self.server.close()
if self.socket_path.exists():
"""Clean up the restart manager resources."""
if self.server:
self.logger.debug("Closing restart manager server")
self.server.close()
if self.socket_path.exists():
self.logger.debug(f"Removing socket file: {self.socket_path}")
try:
self.socket_path.unlink()
except Exception as e:
pass
except Exception as e:
self.logger.error(f"Error removing socket file: {e}")
@staticmethod
async def signal_restart(bot_id: str):
"""Signal a specific bot to restart.
@ -135,15 +144,20 @@ class RestartManager:
Args:
bot_id: ID of the bot to restart
"""
# Get a logger for this static method
logger = get_logger("RestartManager")
socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
logger.info(f"Sending restart signal to bot {bot_id} at {socket_path}")
try:
reader, writer = await asyncio.open_unix_connection(str(socket_path))
writer.write(b'restart')
await writer.drain()
writer.close()
await writer.wait_closed()
logger.info(f"Restart signal sent to bot {bot_id}")
except Exception as e:
pass
logger.error(f"Error signaling restart to bot {bot_id}: {e}")
class IcecastBot:
"""An IRC bot that monitors an Icecast stream and announces track changes.
@ -173,29 +187,32 @@ class IcecastBot:
Args:
config_path: Path to the YAML configuration file. If None, uses default path.
"""
print(f"Initializing IcecastBot with config path: {config_path}")
# Get a logger for this class
self.logger = get_logger("IcecastBot")
self.logger.info(f"Initializing IcecastBot with config path: {config_path}")
# Store config path for potential restarts
self.config_path = config_path
# Load config
print("Loading configuration...")
self.logger.info("Loading configuration...")
self.config = self.load_config(config_path)
print(f"Configuration loaded: {self.config}")
self.logger.debug(f"Configuration loaded: {self.config}")
# Create unique bot ID from nick and endpoint
self.bot_id = f"{self.config['irc']['nick']}_{self.config['stream']['endpoint']}"
print(f"Bot ID: {self.bot_id}")
self.logger.info(f"Bot ID: {self.bot_id}")
# Initialize restart manager
print("Initializing restart manager...")
self.logger.info("Initializing restart manager...")
self.restart_manager = RestartManager(self.bot_id)
# Set up bot name
bot_name = f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}'
print(f"Bot name: {bot_name}")
self.logger.info(f"Bot name: {bot_name}")
# Initialize IRC bot with config and bot name
print("Creating IRC client...")
self.logger.info("Creating IRC client...")
try:
self.bot = Client(
host=self.config['irc']['host'],
@ -206,11 +223,11 @@ class IcecastBot:
bot_name=bot_name,
config=self.config
)
print("IRC client created successfully")
self.logger.info("IRC client created successfully")
except Exception as e:
print(f"Error creating IRC client: {e}")
self.logger.error(f"Error creating IRC client: {e}")
import traceback
traceback.print_exc()
self.logger.error(f"Traceback: {traceback.format_exc()}")
raise
# Set up instance variables
@ -265,19 +282,21 @@ class IcecastBot:
Returns:
dict: The loaded and validated configuration dictionary with default values applied.
"""
print(f"Loading config from: {config_path}")
logger = get_logger("IcecastBot.config")
logger.debug(f"Loading config from: {config_path}")
if config_path is None:
config_path = Path(__file__).parent / 'config.yaml'
print(f"No config path provided, using default: {config_path}")
logger.info(f"No config path provided, using default: {config_path}")
# Load config file
try:
print(f"Opening config file: {config_path}")
logger.debug(f"Opening config file: {config_path}")
with open(config_path) as f:
config = yaml.safe_load(f)
print(f"Config loaded successfully: {config}")
logger.debug(f"Config loaded successfully")
except FileNotFoundError:
print(f"Config file not found: {config_path}, using defaults")
logger.warning(f"Config file not found: {config_path}, using defaults")
config = {
'irc': {},
'stream': {},
@ -287,9 +306,8 @@ class IcecastBot:
}
}
except Exception as e:
print(f"Error loading config: {e}")
import traceback
traceback.print_exc()
logger.error(f"Error loading config: {e}")
exception("Failed to load configuration")
raise
return config
@ -305,11 +323,11 @@ class IcecastBot:
"""
try:
if not song:
print("Empty song title, not announcing")
self.logger.debug("Empty song title, not announcing")
return False
if not self.ignore_patterns:
print("No ignore patterns configured, announcing all songs")
self.logger.debug("No ignore patterns configured, announcing all songs")
return True
# Check each pattern
@ -318,21 +336,20 @@ class IcecastBot:
if not pattern:
continue
if not isinstance(pattern, str):
print(f"Invalid ignore pattern (not a string): {pattern}")
self.logger.warning(f"Invalid ignore pattern (not a string): {pattern}")
continue
if pattern.lower() in song.lower():
print(f"Song '{song}' matched ignore pattern '{pattern}', not announcing")
self.logger.debug(f"Song '{song}' matched ignore pattern '{pattern}', not announcing")
return False
except Exception as e:
print(f"Error checking ignore pattern '{pattern}': {str(e)}")
self.logger.error(f"Error checking ignore pattern '{pattern}': {str(e)}")
continue
print(f"Song '{song}' passed all ignore patterns, will announce")
self.logger.debug(f"Song '{song}' passed all ignore patterns, will announce")
return True
except Exception as e:
print(f"Exception in should_announce_song: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Exception in should_announce_song: {str(e)}")
exception("Failed to check if song should be announced")
# Default to not announcing if there's an error
return False
@ -762,7 +779,7 @@ class IcecastBot:
str: The current song title, or an error message if fetching failed.
"""
try:
print(f"Fetching metadata from {self.stream_url}/{self.stream_endpoint}")
self.logger.info(f"Fetching metadata from {self.stream_url}/{self.stream_endpoint}")
# Try different URL patterns
base_urls = [
self.stream_url, # Original URL
@ -773,13 +790,13 @@ class IcecastBot:
for base_url in base_urls:
try:
url = f"{base_url}/status-json.xsl"
print(f"Trying URL: {url}")
self.logger.debug(f"Trying URL: {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status == 200:
data = await response.text()
print(f"Received response from {url}")
self.logger.debug(f"Received response from {url}")
try:
json_data = json.loads(data)
@ -793,32 +810,32 @@ class IcecastBot:
if src.get('listenurl', '').endswith(self.stream_endpoint):
title = src.get('title') or src.get('song') or src.get('current_song')
if title:
print(f"Found title: {title}")
self.logger.debug(f"Found title: {title}")
return title
except json.JSONDecodeError as e:
print(f"JSON decode error for {url}: {str(e)}")
self.logger.warning(f"JSON decode error for {url}: {str(e)}")
continue
except aiohttp.ClientError as e:
print(f"Client error for {url}: {str(e)}")
self.logger.warning(f"Client error for {url}: {str(e)}")
continue
except json.JSONDecodeError as e:
print(f"JSON decode error for {url}: {str(e)}")
self.logger.warning(f"JSON decode error for {url}: {str(e)}")
continue
except asyncio.TimeoutError:
print(f"Timeout fetching metadata from {url}")
self.logger.warning(f"Timeout fetching metadata from {url}")
continue
except Exception as e:
print(f"Unexpected error fetching from {url}: {str(e)}")
self.logger.error(f"Unexpected error fetching from {url}: {str(e)}")
continue
print("All URL patterns failed, returning 'Unable to fetch metadata'")
self.logger.warning("All URL patterns failed, returning 'Unable to fetch metadata'")
return "Unable to fetch metadata"
except Exception as e:
print(f"Exception in fetch_json_metadata: {str(e)}")
self.logger.error(f"Exception in fetch_json_metadata: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Traceback: {traceback.format_exc()}")
return f"Error fetching metadata: {str(e)}"
async def monitor_metadata(self):
@ -846,6 +863,7 @@ class IcecastBot:
stderr=asyncio.subprocess.PIPE
)
self.current_process = process # Store the process reference
self.logger.debug(f"Started curl process to monitor stream: {self.stream_url}/{self.stream_endpoint}")
last_data_received = time.time()
buffer = b""
@ -865,118 +883,129 @@ class IcecastBot:
else:
empty_chunks_count += 1
if empty_chunks_count >= max_empty_chunks:
self.logger.warning(f"Received {max_empty_chunks} empty chunks in a row, reconnecting")
break
if time.time() - last_data_received > data_timeout:
self.logger.warning(f"Data timeout exceeded ({data_timeout}s), reconnecting")
break
current_time = time.time()
# Periodic health check
if current_time - self.last_health_check >= self.health_check_interval:
pass
self.logger.debug("Performing periodic health check")
self.last_health_check = current_time
# Look for metadata marker but fetch from JSON
if b"StreamTitle='" in buffer:
try:
print("Detected StreamTitle marker in buffer, fetching metadata")
self.logger.debug("Detected StreamTitle marker in buffer, fetching metadata")
new_song = await self.fetch_json_metadata()
# Check if we should announce the song
if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song:
print(f"Song changed from '{self.current_song}' to '{new_song}'")
self.logger.info(f"Song changed from '{self.current_song}' to '{new_song}'")
self.current_song = new_song
# Try to announce the song
try:
await self.announce_song(new_song)
except Exception as e:
print(f"Error announcing song: {str(e)}")
self.logger.error(f"Error announcing song: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Traceback: {traceback.format_exc()}")
else:
# No song change or unable to fetch metadata
print(f"No song change detected or unable to fetch metadata: {new_song}")
self.logger.debug(f"No song change detected or unable to fetch metadata: {new_song}")
# Clear buffer after metadata marker
marker_pos = buffer.find(b"StreamTitle='")
end_pos = buffer.find(b"';", marker_pos)
if end_pos > marker_pos:
buffer = buffer[end_pos + 2:]
print("Buffer cleared after metadata marker")
self.logger.debug("Buffer cleared after metadata marker")
else:
print(f"Could not find end of metadata marker, truncating buffer")
self.logger.warning(f"Could not find end of metadata marker, truncating buffer")
buffer = buffer[-8192:] # Keep last 8KB to avoid losing the end marker
# Update last check time
last_json_check = current_time
except Exception as e:
print(f"Error processing metadata marker: {str(e)}")
self.logger.error(f"Error processing metadata marker: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Traceback: {traceback.format_exc()}")
# Reset buffer to avoid getting stuck in a loop
buffer = b""
# Keep buffer size reasonable
if len(buffer) > 65536:
buffer = buffer[-32768:]
self.logger.debug("Buffer size exceeded limit, truncated to 32KB")
# Fallback JSON check if ICY updates aren't coming through
if current_time - last_json_check >= json_check_interval:
try:
print("Performing fallback JSON check")
self.logger.debug("Performing fallback JSON check")
new_song = await self.fetch_json_metadata()
if "Unable to fetch metadata" in new_song:
print("Unable to fetch metadata in fallback check")
self.logger.warning("Unable to fetch metadata in fallback check")
if time.time() - last_data_received > data_timeout:
print("Data timeout exceeded, breaking loop")
self.logger.warning("Data timeout exceeded, breaking loop")
break
elif new_song and new_song != self.current_song:
print(f"Song changed in fallback check from '{self.current_song}' to '{new_song}'")
self.logger.info(f"Song changed in fallback check from '{self.current_song}' to '{new_song}'")
self.current_song = new_song
try:
await self.announce_song(new_song)
except Exception as e:
print(f"Error announcing song in fallback check: {str(e)}")
self.logger.error(f"Error announcing song in fallback check: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Traceback: {traceback.format_exc()}")
else:
print(f"No song change detected in fallback check: {new_song}")
self.logger.debug(f"No song change detected in fallback check: {new_song}")
last_json_check = current_time
except Exception as e:
print(f"Error in fallback JSON check: {str(e)}")
self.logger.error(f"Error in fallback JSON check: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Traceback: {traceback.format_exc()}")
# Still update the check time to avoid rapid retries
last_json_check = current_time
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
self.logger.warning("Timeout reading from stream")
if time.time() - last_data_received > data_timeout:
self.logger.warning(f"Data timeout exceeded ({data_timeout}s), reconnecting")
break
continue
except Exception as e:
self.logger.error(f"Error reading from stream: {str(e)}")
break
# Check if process is still running and terminate if needed
if process.returncode is None:
try:
self.logger.debug("Terminating curl process")
process.terminate()
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
self.logger.warning("Timeout waiting for curl process to terminate")
except Exception as e:
pass
self.logger.error(f"Error terminating curl process: {str(e)}")
finally:
self.current_process = None
self.logger.info("Reconnecting to stream after short delay")
await asyncio.sleep(5)
except Exception as e:
self.logger.error(f"Error in monitor_metadata: {str(e)}")
import traceback
self.logger.error(f"Traceback: {traceback.format_exc()}")
await asyncio.sleep(5)
async def announce_song(self, song: str):
@ -991,35 +1020,33 @@ class IcecastBot:
- The announcement format is configured
"""
try:
print(f"Attempting to announce song: {song}")
self.logger.info(f"Attempting to announce song: {song}")
if self.channel and self.should_announce_song(song):
print(f"Song passed filters, preparing to announce")
self.logger.debug(f"Song passed filters, preparing to announce")
# Use the stored channel object directly
if hasattr(self.channel, 'name') and self.channel.name.startswith('#'):
try:
formatted_message = self.reply.format(song=song)
print(f"Sending message to channel {self.channel.name}: {formatted_message}")
self.logger.debug(f"Sending message to channel {self.channel.name}: {formatted_message}")
await self.channel.message(self.reply.format(song=song))
print(f"Successfully announced song: {song}")
self.logger.info(f"Successfully announced song: {song}")
except Exception as e:
print(f"Error sending message to channel: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Error sending message to channel: {str(e)}")
exception("Failed to send message to channel")
else:
print(f"Channel object invalid or not a channel: {self.channel}")
self.logger.warning(f"Channel object invalid or not a channel: {self.channel}")
else:
if not self.channel:
print("Channel object is None or invalid")
self.logger.warning("Channel object is None or invalid")
elif not self.should_announce_song(song):
print(f"Song '{song}' matched ignore patterns, not announcing")
self.logger.debug(f"Song '{song}' matched ignore patterns, not announcing")
except Exception as e:
print(f"Exception in announce_song: {str(e)}")
import traceback
traceback.print_exc()
self.logger.error(f"Exception in announce_song: {str(e)}")
exception("Failed to announce song")
async def start(self):
"""Start the IRC bot and begin processing events."""
print("Starting IcecastBot...")
self.logger.info("Starting IcecastBot...")
try:
# Create a state file for the manager to detect
state_file = Path(tempfile.gettempdir()) / 'icecast-irc-bots.json'
@ -1043,7 +1070,7 @@ class IcecastBot:
# Save the state
with open(state_file, 'w') as f:
json.dump(state, f)
print(f"Created state file at {state_file}")
self.logger.info(f"Created state file at {state_file}")
# Register a signal handler to remove this bot from the state file on exit
def cleanup_state_file(signum, frame):
@ -1055,9 +1082,9 @@ class IcecastBot:
del current_state[self.bot_id]
with open(state_file, 'w') as f:
json.dump(current_state, f)
print(f"Removed {self.bot_id} from state file")
self.logger.info(f"Removed {self.bot_id} from state file")
except Exception as e:
print(f"Error cleaning up state file: {e}")
self.logger.error(f"Error cleaning up state file: {e}")
sys.exit(0)
# Register signal handlers
@ -1065,28 +1092,28 @@ class IcecastBot:
signal.signal(signal.SIGINT, cleanup_state_file)
except Exception as e:
print(f"Error creating state file: {e}")
self.logger.error(f"Error creating state file: {e}")
# Start the restart manager
print("Starting restart manager...")
self.logger.info("Starting restart manager...")
await self.restart_manager.start()
print("Restart manager started")
self.logger.info("Restart manager started")
# Start the bot
print("Starting IRC bot...")
self.logger.info("Starting IRC bot...")
await self.bot.run()
print("IRC bot started")
self.logger.info("IRC bot started")
except Exception as e:
print(f"Error starting bot: {e}")
self.logger.error(f"Error starting bot: {e}")
import traceback
traceback.print_exc()
self.logger.error(f"Traceback: {traceback.format_exc()}")
finally:
print("In start() finally block")
self.logger.info("In start() finally block")
if self.should_exit:
print("Bot should exit, cleaning up...")
self.logger.info("Bot should exit, cleaning up...")
# Clean up any remaining tasks
if self.monitor_task:
print("Canceling monitor task...")
self.logger.info("Canceling monitor task...")
self.monitor_task.cancel()
try:
await self.monitor_task
@ -1094,9 +1121,9 @@ class IcecastBot:
pass
# Clean up restart manager
print("Cleaning up restart manager...")
self.logger.info("Cleaning up restart manager...")
self.restart_manager.cleanup()
print("Restart manager cleaned up")
self.logger.info("Restart manager cleaned up")
def format_help_section(self, section_config: dict, prefix: str) -> List[str]:
"""Format a help section according to the template.
@ -1227,46 +1254,49 @@ async def run_single_bot(bot: IcecastBot):
Args:
bot: The IcecastBot instance to run
"""
print(f"Running single bot with ID: {bot.bot_id}")
bot.logger.info(f"Running single bot with ID: {bot.bot_id}")
try:
print("Starting bot...")
bot.logger.info("Starting bot...")
await bot.start()
print("Bot start completed")
bot.logger.info("Bot start completed")
# Check if we should restart this bot
if bot.restart_manager.should_restart:
print("Bot should restart")
bot.logger.info("Bot should restart")
# Clean up
print("Cleaning up restart manager...")
bot.logger.info("Cleaning up restart manager...")
bot.restart_manager.cleanup()
# Create and start a new instance
print("Creating new bot instance...")
bot.logger.info("Creating new bot instance...")
new_bot = IcecastBot(bot.config_path)
print("Starting new bot instance...")
bot.logger.info("Starting new bot instance...")
await run_single_bot(new_bot)
# If should_exit is True but should_restart is False, just exit cleanly
elif bot.should_exit:
print("Bot should exit cleanly")
bot.logger.info("Bot should exit cleanly")
bot.restart_manager.cleanup()
except Exception as e:
print(f"Error in run_single_bot: {e}")
bot.logger.error(f"Error in run_single_bot: {e}")
import traceback
traceback.print_exc()
bot.logger.error(f"Traceback: {traceback.format_exc()}")
finally:
print("In run_single_bot finally block")
bot.logger.info("In run_single_bot finally block")
# Ensure cleanup happens
if bot.monitor_task:
print("Canceling monitor task...")
bot.logger.info("Canceling monitor task...")
bot.monitor_task.cancel()
try:
await bot.monitor_task
print("Monitor task canceled")
bot.logger.info("Monitor task canceled")
except asyncio.CancelledError:
print("Monitor task canceled with CancelledError")
bot.logger.info("Monitor task canceled with CancelledError")
pass
if __name__ == "__main__":
print("Starting Icecast IRC Bot...")
# Initialize the logger
logger = get_logger("main")
logger.info("Starting Icecast IRC Bot...")
parser = argparse.ArgumentParser(description='Icecast IRC Bot')
parser.add_argument('configs', nargs='*', help='Paths to config files')
parser.add_argument('--config', type=str, help='Path to single config file')
@ -1279,25 +1309,25 @@ if __name__ == "__main__":
parser.add_argument('--cmd-prefix', type=str, help='Command prefix character(s)')
args = parser.parse_args()
print(f"Arguments parsed: {args}")
logger.info(f"Arguments parsed: {args}")
# Set up the event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print("Event loop created")
logger.info("Event loop created")
async def run_bot():
try:
print("Starting bot...")
logger.info("Starting bot...")
if args.configs:
# Multi-bot mode
print(f"Running in multi-bot mode with configs: {args.configs}")
logger.info(f"Running in multi-bot mode with configs: {args.configs}")
await run_multiple_bots(args.configs)
else:
# Single-bot mode
print(f"Running in single-bot mode with config: {args.config}")
logger.info(f"Running in single-bot mode with config: {args.config}")
bot = IcecastBot(args.config)
print("Bot instance created")
logger.info("Bot instance created")
# Apply any command line overrides to the config
if args.irc_host:
@ -1317,25 +1347,25 @@ if __name__ == "__main__":
bot.config['commands'] = {}
bot.config['commands']['prefix'] = args.cmd_prefix
print("Starting single bot...")
logger.info("Starting single bot...")
await run_single_bot(bot)
except Exception as e:
print(f"Error in run_bot: {e}")
logger.error(f"Error in run_bot: {e}")
import traceback
traceback.print_exc()
logger.error(f"Traceback: {traceback.format_exc()}")
sys.exit(1)
try:
# Run the bot
print("Running event loop...")
logger.info("Running event loop...")
loop.run_until_complete(run_bot())
except Exception as e:
print(f"Error in main: {e}")
logger.error(f"Error in main: {e}")
import traceback
traceback.print_exc()
logger.error(f"Traceback: {traceback.format_exc()}")
finally:
try:
print("Cleaning up...")
logger.info("Cleaning up...")
# Cancel any remaining tasks
for task in asyncio.all_tasks(loop):
task.cancel()
@ -1343,8 +1373,8 @@ if __name__ == "__main__":
loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True))
# Finally close the loop
loop.close()
print("Cleanup complete")
logger.info("Cleanup complete")
except Exception as e:
print(f"Error during cleanup: {e}")
logger.error(f"Error during cleanup: {e}")
import traceback
traceback.print_exc()
logger.error(f"Traceback: {traceback.format_exc()}")

View File

@ -1,3 +1,4 @@
asif
aiohttp
pyyaml
loguru>=0.7.0