Compare commits

..

5 Commits

11 changed files with 1215 additions and 536 deletions

View File

@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [1.2.0] - 2024-07-01
### Added
- New admin commands:
- !quiet - Disable song announcements while continuing to monitor the stream
- !unquiet - Re-enable song announcements
- Terminal commands for controlling announcement behavior:
- `quiet` - Disable song announcements via terminal
- `unquiet` - Enable song announcements via terminal
- Configuration option `quiet_on_start` to control whether the bot starts with announcements enabled or disabled
- Background task to monitor for quiet/unquiet requests from terminal commands
- Updated documentation in README.md with new commands
- Updated usage information in install.sh
### Changed
- Enhanced the RestartManager to handle quiet and unquiet commands
- Improved the BotManager class to support the new terminal commands
- Updated configuration example to include the new quiet_on_start option
## [1.0.1] - 2024-02-24
### Added
@ -56,6 +75,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Multi-bot support
- Configuration via YAML files
[Unreleased]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.1...HEAD
[Unreleased]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.2.0...HEAD
[1.2.0]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.1...v1.2.0
[1.0.1]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.0...v1.0.1
[1.0.0]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/releases/tag/v1.0.0

View File

@ -8,14 +8,14 @@ Note: This is a work in progress. It has only been tested on **Python 3.12.6**.
## Features
- Monitors Icecast stream metadata and announces track changes
- Configurable via YAML files and command line arguments
- Supports running multiple bot instances simultaneously
- Pattern-based song title filtering
- Configurable logging levels and output
- Smart URL resolution for metadata fetching
- Automatic reconnection and error recovery with status reporting
- Admin commands with permission system
- Monitors Icecast streams for metadata changes
- Announces new tracks in IRC channels
- Supports multiple IRC networks and channels
- Customizable announcement formats
- Command system with admin privileges
- Automatic reconnection on network issues
- Multiple bot instances can be managed together
- Systemd service integration
## Dependencies
@ -37,21 +37,21 @@ Create a YAML config file (default: `config.yaml`):
```yaml
irc:
host: "irc.example.net"
host: "irc.libera.chat"
port: 6667
nick: "MusicBot"
user: "musicbot"
realname: "Music Announcer Bot"
channel: "#music"
nick: "IcecastBot"
user: "icecastbot"
realname: "Icecast IRC Bot"
channel: "#yourchannel"
stream:
url: "https://stream.example.com"
endpoint: "stream"
health_check_interval: 300
url: "https://your.stream.url" # Base URL without /stream or .mp3
endpoint: "/stream" # The endpoint part (e.g. /stream, /radio.mp3)
health_check_interval: 300 # How often to check health status (in seconds)
announce:
format: "\x02Now playing:\x02 {song}"
ignore_patterns:
format: "\x02Now playing:\x02 {song}" # Format for song announcements
ignore_patterns: # Don't announce songs matching these patterns
- "Unknown"
- "Unable to fetch metadata"
- "Error fetching metadata"
@ -61,16 +61,31 @@ commands:
require_nick_prefix: false # If true, commands must be prefixed with "botname: " or "botname, "
allow_private_commands: false # If true, allows commands in private messages
help: # Help message templates
specific_format: "\x02{prefix}{cmd}\x02: {desc}" # Format for specific command help
list_format: "(\x02{cmd}\x02, {desc})" # Format for commands in list
list_separator: " | " # Separator between commands in list
admin:
users: # List of users who can use admin commands (use "*" for anyone)
- "*"
logging:
level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL
```
## Usage
## Manager Commands
The bot manager supports the following commands:
```bash
icecast-irc-bot-manager list # List running bots
icecast-irc-bot-manager start --config FILE # Start a bot
icecast-irc-bot-manager stop BOT_ID # Stop a bot
icecast-irc-bot-manager restart BOT_ID # Restart a bot
icecast-irc-bot-manager quiet BOT_ID # Disable song announcements
icecast-irc-bot-manager unquiet BOT_ID # Enable song announcements
```
### Running with Automatic Restart Support
The recommended way to run the bot is using the provided `bot.sh` script, which handles automatic restarts when using the `!restart` command:
@ -119,35 +134,22 @@ Run multiple instances with different configs:
python main.py config1.yaml config2.yaml config3.yaml
```
## IRC Commands
## Commands
Regular commands:
- `!np`: Shows the currently playing track
- `!help`: Shows available commands
The bot supports the following commands:
Admin commands:
- `!start`: Start stream monitoring
- `!stop`: Stop stream monitoring
- `!reconnect`: Reconnect to stream (with status feedback)
- `!restart`: Restart the bot (requires using bot.sh)
- `!quit`: Shutdown the bot
- `!np` - Show the currently playing song
- `!help` - Show available commands or help for a specific command
## Logging
Admin commands (only available to users listed in the `admin.users` config):
The bot supports different logging levels configurable in the config.yaml:
- DEBUG: Detailed information for troubleshooting
- INFO: General operational messages (default)
- WARNING: Warning messages and potential issues
- ERROR: Error messages only
- CRITICAL: Critical failures only
Logs include:
- Stream health status
- Command processing
- Connection status
- Error details
The bot also maintains an ERROR.log file for critical issues.
- `!start` - Start stream monitoring
- `!stop` - Stop stream monitoring
- `!quiet` - Disable song announcements but continue monitoring
- `!unquiet` - Enable song announcements
- `!reconnect` - Reconnect to the stream
- `!restart` - Restart the bot
- `!quit` - Shutdown the bot
## Error Handling

View File

@ -1 +1 @@
1.1.0
1.2.0

View File

@ -17,6 +17,7 @@ announce:
- "Unknown"
- "Unable to fetch metadata"
- "Error fetching metadata"
quiet_on_start: false # If true, bot starts in quiet mode (no announcements)
commands:
prefix: "!" # Command prefix (e.g. !np, !help)
@ -30,10 +31,4 @@ help: # Help message templates
admin:
users: # List of users who can use admin commands (use "*" for anyone)
- "*"
logging:
level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL
format: "%(asctime)s - %(levelname)s - %(message)s" # Format for console logs
error_format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" # Format for error logs
datefmt: "%H:%M:%S" # Date/time format for log timestamps
- "*"

View File

@ -3,93 +3,368 @@
import argparse
import asyncio
import json
import logging
import os
import signal
import sys
import tempfile
import time
from pathlib import Path
from typing import Dict, List, Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('icecast-irc-bot-manager')
# ANSI color codes for terminal output if needed
class Colors:
COLORS = [
'\033[94m', # BLUE
'\033[92m', # GREEN
'\033[95m', # MAGENTA
'\033[93m', # YELLOW
'\033[96m', # CYAN
'\033[91m', # RED
'\033[38;5;208m', # ORANGE
'\033[38;5;165m', # PURPLE
'\033[38;5;39m', # DEEP BLUE
'\033[38;5;82m', # LIME
]
ENDC = '\033[0m'
BOLD = '\033[1m'
# Additional colors for output
GREY = '\033[37m'
WHITE = '\033[97m'
RED = '\033[91m'
ORANGE = '\033[38;5;208m' # Using 256-color code for orange
CYAN = '\033[96m'
MAGENTA = '\033[95m'
class BotManager:
"""Manages multiple Icecast IRC bot instances."""
def __init__(self):
self.bots: Dict[str, asyncio.subprocess.Process] = {}
self.config_dir = Path('/etc/icecast-irc-bot')
self.config_dir = Path('.') # Use current directory instead of /etc/icecast-irc-bot
self.socket_dir = Path(tempfile.gettempdir())
self.state_file = Path(tempfile.gettempdir()) / 'icecast-irc-bots.json'
self.monitor_task = None
self.venv_python = os.getenv('VIRTUAL_ENV', '/opt/icecast-irc-bot/venv') + '/bin/python3'
# If venv_python doesn't exist, use the current Python interpreter
if not os.path.exists(self.venv_python):
self.venv_python = sys.executable
async def start_bot(self, config_path: Path) -> bool:
"""Start a bot instance with the given config.
def _save_state(self):
"""Save the current state of running bots."""
try:
# Load existing state
existing_state = self._load_state()
# Update with current bots
current_state = {
bot_id: {
'pid': process.pid,
'config': str(self.config_dir / f"{bot_id}.yaml")
}
for bot_id, process in self.bots.items()
}
# Merge states, with current bots taking precedence
merged_state = {**existing_state, **current_state}
# Save merged state
with open(self.state_file, 'w') as f:
json.dump(merged_state, f)
print(f"Saved state with {len(merged_state)} bots")
except Exception as e:
print(f"Error saving state: {e}")
pass
def _load_state(self):
"""Load the state of running bots."""
try:
if self.state_file.exists():
with open(self.state_file, 'r') as f:
state = json.load(f)
return state
return {}
except Exception:
return {}
async def monitor_processes(self):
"""Monitor running bot processes and clean up dead ones."""
while True:
try:
for bot_id in list(self.bots.keys()):
process = self.bots[bot_id]
try:
# Check if process exists in system
os.kill(process.pid, 0)
# Check if process has terminated
if process.returncode is not None:
await self._cleanup_process(process)
del self.bots[bot_id]
except ProcessLookupError:
await self._cleanup_process(process)
del self.bots[bot_id]
except Exception:
pass
except Exception:
pass
await asyncio.sleep(5) # Check every 5 seconds
async def _cleanup_process(self, process: asyncio.subprocess.Process):
"""Clean up a process and its resources.
Args:
config_path: Path to the bot's config file
process: The process to clean up
"""
try:
print(f"Cleaning up process with PID {process.pid}")
# Wait for the process to finish if it hasn't
if process.returncode is None:
try:
print(f"Terminating process {process.pid}")
process.terminate()
await asyncio.wait_for(process.wait(), timeout=5.0)
except (asyncio.TimeoutError, ProcessLookupError) as e:
print(f"Error terminating process: {e}")
try:
print(f"Killing process {process.pid}")
process.kill()
await process.wait()
except ProcessLookupError as e:
print(f"Process already gone: {e}")
pass
# Drain any remaining output to prevent deadlocks
if process.stdout:
try:
data = await process.stdout.read()
if data:
print(f"Remaining stdout: {data.decode().strip()}")
except (ValueError, IOError) as e:
print(f"Error reading stdout: {e}")
pass
if process.stderr:
try:
data = await process.stderr.read()
if data:
print(f"Remaining stderr: {data.decode().strip()}")
except (ValueError, IOError) as e:
print(f"Error reading stderr: {e}")
pass
except Exception as e:
print(f"Exception during cleanup: {e}")
pass
async def start_bot(self, config_path: Path) -> bool:
"""Start a new bot instance with the given config file.
Args:
config_path: Path to the config file
Returns:
bool: True if bot was started successfully
"""
try:
# Create unique name for this bot instance
# Generate a unique ID for this bot based on the config file name
bot_id = config_path.stem
# Check if bot is already running
if bot_id in self.bots:
logger.warning(f"Bot {bot_id} is already running")
return False
# Check if a bot with this ID is already running
state = self._load_state()
if bot_id in state:
try:
pid = state[bot_id]['pid']
os.kill(pid, 0) # Check if process exists
return False
except ProcessLookupError:
# Process doesn't exist, remove from state
pass
# Start the bot process
# Start the bot process using venv Python
process = await asyncio.create_subprocess_exec(
sys.executable, '-m', 'main',
self.venv_python, 'main.py',
'--config', str(config_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.bots[bot_id] = process
logger.info(f"Started bot {bot_id} (PID: {process.pid})")
return True
# Verify process started successfully
try:
os.kill(process.pid, 0)
except ProcessLookupError:
await self._cleanup_process(process)
return False
except Exception as e:
logger.error(f"Failed to start bot with config {config_path}: {e}")
# Start the monitor task if not already running
if self.monitor_task is None:
self.monitor_task = asyncio.create_task(self.monitor_processes())
# Monitor the process output for a short time to ensure it starts properly
try:
startup_timeout = 5.0 # Give it 5 seconds to start
start_time = time.time()
success = False
while time.time() - start_time < startup_timeout:
# Check if process has exited
if process.returncode is not None:
await self._cleanup_process(process)
return False
# Read any available output during startup
try:
line = await asyncio.wait_for(process.stderr.readline(), timeout=0.1)
if line:
line = line.decode().strip()
print(f"Bot output: {line}")
# Consider the bot started successfully after a short delay
# instead of looking for specific output
success = True
except asyncio.TimeoutError:
# No output available, continue monitoring
continue
except Exception as e:
print(f"Error reading bot output: {e}")
pass
# Consider the bot started successfully if it's still running after the timeout
if not success and process.returncode is None:
success = True
# Start background task to monitor process output
async def monitor_output(process, bot_id):
try:
while True:
if process.returncode is not None:
break
try:
line = await process.stderr.readline()
if not line:
break
# Print the bot's output
print(f"Bot {bot_id} output: {line.decode().strip()}")
except Exception as e:
print(f"Error monitoring bot {bot_id}: {e}")
break
except Exception as e:
print(f"Monitor task exception for bot {bot_id}: {e}")
pass
asyncio.create_task(monitor_output(process, bot_id))
# Store the process
self.bots[bot_id] = process
self._save_state()
return True
except Exception:
await self._cleanup_process(process)
return False
except Exception:
return False
async def stop_bot(self, bot_id: str) -> bool:
"""Stop a running bot instance.
Args:
bot_id: ID of the bot to stop, or "all" to stop all bots
Returns:
bool: True if bot(s) were stopped successfully
"""
if bot_id == "all":
print("Stopping all bots...")
success = True
state = self._load_state()
for bid in list(state.keys()):
print(f"Stopping bot {bid}...")
if not await self._stop_single_bot(bid):
success = False
return success
# Stop a single bot
return await self._stop_single_bot(bot_id)
async def _stop_single_bot(self, bot_id: str) -> bool:
"""Stop a single bot instance.
Args:
bot_id: ID of the bot to stop
Returns:
bool: True if bot was stopped successfully
"""
if bot_id not in self.bots:
logger.warning(f"Bot {bot_id} is not running")
return False
try:
# Check both local bots and state file
state = self._load_state()
process = None
if bot_id in self.bots:
process = self.bots[bot_id]
process.terminate()
elif bot_id in state:
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
process.kill()
await process.wait()
del self.bots[bot_id]
logger.info(f"Stopped bot {bot_id}")
return True
except Exception as e:
logger.error(f"Failed to stop bot {bot_id}: {e}")
pid = state[bot_id]['pid']
print(f"Stopping bot {bot_id} with PID {pid}")
# Try to kill the process
try:
# First try a gentle termination
os.kill(pid, signal.SIGTERM)
print(f"Sent SIGTERM to process {pid}")
# Wait a bit for the process to terminate
for i in range(50): # 5 seconds
await asyncio.sleep(0.1)
try:
os.kill(pid, 0) # Check if process exists
except ProcessLookupError:
print(f"Process {pid} terminated successfully")
break
else:
# Process didn't terminate, force kill
print(f"Process {pid} didn't terminate, sending SIGKILL")
try:
os.kill(pid, signal.SIGKILL)
print(f"Sent SIGKILL to process {pid}")
except ProcessLookupError:
print(f"Process {pid} already terminated")
pass
except ProcessLookupError:
print(f"Process {pid} not found")
pass
# Remove only this bot from state
if bot_id in state:
print(f"Removing {bot_id} from state file")
del state[bot_id]
with open(self.state_file, 'w') as f:
json.dump(state, f)
print(f"State file updated, remaining bots: {list(state.keys())}")
return True
except Exception as e:
print(f"Error stopping bot {bot_id}: {e}")
return False
else:
print(f"Bot {bot_id} not found")
return False
if process:
try:
print(f"Cleaning up process for bot {bot_id}")
await self._cleanup_process(process)
del self.bots[bot_id]
# Update state file - only remove this bot
state = self._load_state()
if bot_id in state:
print(f"Removing {bot_id} from state file")
del state[bot_id]
with open(self.state_file, 'w') as f:
json.dump(state, f)
print(f"State file updated, remaining bots: {list(state.keys())}")
return True
except Exception as e:
print(f"Error cleaning up process for bot {bot_id}: {e}")
return False
async def restart_bot(self, bot_id: str) -> bool:
"""Restart a running bot instance.
@ -103,7 +378,6 @@ class BotManager:
# Find the config file for this bot
config_path = self.config_dir / f"{bot_id}.yaml"
if not config_path.exists():
logger.error(f"Config file not found for bot {bot_id}")
return False
# Stop the bot if it's running
@ -114,81 +388,267 @@ class BotManager:
# Start the bot with the same config
return await self.start_bot(config_path)
async def list_bots(self) -> List[Dict]:
"""List all running bot instances.
async def quiet_bot(self, bot_id: str) -> bool:
"""Disable song announcements for a running bot.
Args:
bot_id: ID of the bot to quiet
Returns:
bool: True if command was sent successfully
"""
# Check if bot exists in state
state = self._load_state()
if bot_id not in state:
print(f"Bot {bot_id} not found")
return False
try:
# Get the socket path for this bot
socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
if not socket_path.exists():
print(f"Socket for bot {bot_id} not found at {socket_path}")
return False
# Send quiet command
print(f"Sending quiet command to bot {bot_id}")
reader, writer = await asyncio.open_unix_connection(str(socket_path))
writer.write(b'quiet')
await writer.drain()
writer.close()
await writer.wait_closed()
print(f"Quiet command sent to bot {bot_id}")
return True
except Exception as e:
print(f"Error sending quiet command to bot {bot_id}: {e}")
return False
async def unquiet_bot(self, bot_id: str) -> bool:
"""Enable song announcements for a running bot.
Args:
bot_id: ID of the bot to unquiet
Returns:
bool: True if command was sent successfully
"""
# Check if bot exists in state
state = self._load_state()
if bot_id not in state:
print(f"Bot {bot_id} not found")
return False
try:
# Get the socket path for this bot
socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
if not socket_path.exists():
print(f"Socket for bot {bot_id} not found at {socket_path}")
return False
# Send unquiet command
print(f"Sending unquiet command to bot {bot_id}")
reader, writer = await asyncio.open_unix_connection(str(socket_path))
writer.write(b'unquiet')
await writer.drain()
writer.close()
await writer.wait_closed()
print(f"Unquiet command sent to bot {bot_id}")
return True
except Exception as e:
print(f"Error sending unquiet command to bot {bot_id}: {e}")
return False
async def list_bots(self) -> bool:
"""List all running bots.
Returns:
List[Dict]: List of bot info dictionaries
bool: True if any bots are running
"""
bot_info = []
for bot_id, process in self.bots.items():
info = {
'id': bot_id,
'pid': process.pid,
'running': process.returncode is None
}
bot_info.append(info)
return bot_info
state = self._load_state()
# Check if any bots are running
if not state:
print("No bots running")
return False
# Track unique PIDs to avoid duplicates
seen_pids = set()
unique_bots = {}
# Filter out duplicates based on PID
for bot_id, info in state.items():
pid = info.get('pid')
if pid and pid not in seen_pids:
seen_pids.add(pid)
unique_bots[bot_id] = info
if not unique_bots:
print("No bots running")
return False
# Print header
print("\nRunning Bots:")
print("-" * 80)
print(f"{'ID':<20} {'PID':<8} {'Status':<10} {'Command':<40}")
print("-" * 80)
# Print each bot's status
for bot_id, info in unique_bots.items():
pid = info.get('pid')
config = info.get('config', '')
# Check if process is still running
try:
os.kill(pid, 0)
status = "running"
except ProcessLookupError:
status = "stopped"
continue # Skip stopped processes
except Exception:
status = "unknown"
# Get command line
try:
with open(f"/proc/{pid}/cmdline", 'rb') as f:
cmdline = f.read().replace(b'\0', b' ').decode()
except Exception:
cmdline = f"Unknown (PID: {pid})"
print(f"{bot_id:<20} {pid:<8} {status:<10} {cmdline:<40}")
print("-" * 80)
print()
return True
async def cleanup(self):
"""Clean up all running bots."""
for bot_id in list(self.bots.keys()):
await self.stop_bot(bot_id)
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
await self.stop_bot("all")
async def main():
parser = argparse.ArgumentParser(description='Icecast IRC Bot Manager')
parser.add_argument('--config', type=str, help='Path to config file or directory')
parser.add_argument('command', choices=['start', 'stop', 'restart', 'list'], help='Command to execute')
parser.add_argument('bot_id', nargs='?', help='Bot ID for start/stop/restart commands')
parser.add_argument('command', choices=['start', 'stop', 'restart', 'list', 'quiet', 'unquiet'], help='Command to execute')
parser.add_argument('bot_id', nargs='?', help='Bot ID for start/stop/restart/quiet/unquiet commands, or "all" to stop all bots')
args = parser.parse_args()
manager = BotManager()
should_cleanup = False # Only cleanup for certain commands
try:
if args.command == 'list':
bot_info = await manager.list_bots()
if bot_info:
print(json.dumps(bot_info, indent=2))
else:
print("No bots running")
if await manager.list_bots():
# If list_bots returns True, we've printed the list
pass
elif args.command == 'start':
should_cleanup = True # Need cleanup for start command
if not args.config:
print("Error: --config required for start command")
sys.exit(1)
config_path = Path(args.config)
if config_path.is_dir():
# Start all bots in directory
success = True
for config_file in config_path.glob('*.yaml'):
await manager.start_bot(config_file)
if not await manager.start_bot(config_file):
success = False
if not success:
sys.exit(1)
else:
# Start single bot
await manager.start_bot(config_path)
if not await manager.start_bot(config_path):
sys.exit(1)
# If we started any bots successfully, keep running until interrupted
if manager.bots:
try:
# Keep the manager running
while True:
await asyncio.sleep(1)
if not manager.bots:
break
except asyncio.CancelledError:
pass
elif args.command == 'stop':
# Don't need cleanup for stop command as it already cleans up
should_cleanup = False
if not args.bot_id:
print("Error: bot_id required for stop command")
print("Error: bot_id required for stop command (use 'all' to stop all bots)")
sys.exit(1)
if not await manager.stop_bot(args.bot_id):
sys.exit(1)
await manager.stop_bot(args.bot_id)
elif args.command == 'restart':
should_cleanup = True # Need cleanup for restart command
if not args.bot_id:
print("Error: bot_id required for restart command")
sys.exit(1)
await manager.restart_bot(args.bot_id)
if args.bot_id == "all":
print("Error: restart all is not supported")
sys.exit(1)
if not await manager.restart_bot(args.bot_id):
sys.exit(1)
# If we restarted successfully, keep running until interrupted
if manager.bots:
try:
# Keep the manager running
while True:
await asyncio.sleep(1)
if not manager.bots:
break
except asyncio.CancelledError:
pass
elif args.command == 'quiet':
should_cleanup = False # Don't need cleanup for quiet command
if not args.bot_id:
print("Error: bot_id required for quiet command")
sys.exit(1)
if args.bot_id == "all":
print("Error: quiet all is not supported")
sys.exit(1)
if not await manager.quiet_bot(args.bot_id):
sys.exit(1)
elif args.command == 'unquiet':
should_cleanup = False # Don't need cleanup for unquiet command
if not args.bot_id:
print("Error: bot_id required for unquiet command")
sys.exit(1)
if args.bot_id == "all":
print("Error: unquiet all is not supported")
sys.exit(1)
if not await manager.unquiet_bot(args.bot_id):
sys.exit(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
await manager.cleanup()
except Exception as e:
logger.error(f"Unhandled error: {e}")
should_cleanup = True # Need cleanup for keyboard interrupt
except Exception:
should_cleanup = True # Need cleanup for errors
sys.exit(1)
finally:
# Only clean up if we need to
if should_cleanup:
await manager.cleanup()
if __name__ == '__main__':
# Set up signal handlers
# Set up signal handlers for graceful shutdown
def signal_handler(sig, frame):
# Don't exit immediately, let the cleanup happen
asyncio.get_event_loop().stop()
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda s, f: sys.exit(0))
signal.signal(sig, signal_handler)
# Run the manager
asyncio.run(main())

View File

@ -13,8 +13,8 @@ Environment=PATH=/opt/icecast-irc-bot/venv/bin:$PATH
ExecStart=/opt/icecast-irc-bot/venv/bin/python3 /usr/local/bin/icecast-irc-bot-manager --config /etc/icecast-irc-bot/config.yaml
Restart=on-failure
RestartSec=5s
StandardOutput=append:/var/log/icecast-irc-bot/bot.log
StandardError=append:/var/log/icecast-irc-bot/error.log
StandardOutput=journal
StandardError=journal
# Security settings
NoNewPrivileges=yes

View File

@ -68,4 +68,6 @@ echo "Usage:"
echo "icecast-irc-bot-manager list # List running bots"
echo "icecast-irc-bot-manager start --config FILE # Start a bot"
echo "icecast-irc-bot-manager stop BOT_ID # Stop a bot"
echo "icecast-irc-bot-manager restart BOT_ID # Restart a bot"
echo "icecast-irc-bot-manager restart BOT_ID # Restart a bot"
echo "icecast-irc-bot-manager quiet BOT_ID # Disable song announcements"
echo "icecast-irc-bot-manager unquiet BOT_ID # Enable song announcements"

121
logger.py Normal file
View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Logger module for the Icecast metadata IRC announcer.
Uses Loguru for simple yet powerful logging.
"""
import os
import sys
from pathlib import Path
from loguru import logger
class LogManager:
"""
Manages logging configuration for the Icecast metadata IRC announcer.
This class provides a simple interface for configuring and using Loguru
throughout the project. It handles log file rotation, formatting, and
different log levels.
"""
def __init__(self, config=None):
"""
Initialize the LogManager with optional configuration.
Args:
config: Optional dictionary with logging configuration.
If None, default configuration is used.
"""
self.config = config or {}
self.log_dir = Path(self.config.get('log_dir', 'logs'))
self.log_level = self.config.get('level', 'INFO').upper()
self.log_format = self.config.get('format',
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
# Create log directory if it doesn't exist
if not self.log_dir.exists():
self.log_dir.mkdir(parents=True, exist_ok=True)
# Configure logger
self._configure_logger()
def _configure_logger(self):
"""Configure Loguru logger with appropriate sinks and formats."""
# Remove default handler
logger.remove()
# Add console handler
logger.add(
sys.stderr,
format=self.log_format,
level=self.log_level,
colorize=True
)
# Add file handler with rotation
log_file = self.log_dir / "icecast_bot.log"
logger.add(
str(log_file),
format=self.log_format,
level=self.log_level,
rotation="10 MB", # Rotate when file reaches 10MB
compression="zip", # Compress rotated logs
retention="1 week", # Keep logs for 1 week
backtrace=True, # Include backtrace in error logs
diagnose=True # Include variables in error logs
)
def get_logger(self, name=None):
"""
Get a logger instance with the given name.
Args:
name: Optional name for the logger. If None, the calling module's name is used.
Returns:
A Loguru logger instance.
"""
if name:
return logger.bind(name=name)
return logger
@staticmethod
def set_level(level):
"""
Set the log level for all handlers.
Args:
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
for handler_id in logger._core.handlers:
logger.configure(handler_id, level=level)
# Create a default instance for easy import
log_manager = LogManager()
get_logger = log_manager.get_logger
# Export the logger directly for simple usage
log = logger
# For convenience, export common log levels
debug = logger.debug
info = logger.info
warning = logger.warning
error = logger.error
critical = logger.critical
exception = logger.exception
# Example usage:
# from logger import log, debug, info, error
# log.info("This is an info message")
# debug("This is a debug message")
#
# # Or with a named logger:
# from logger import get_logger
# logger = get_logger("my_module")
# logger.info("This is a message from my_module")

877
main.py Normal file → Executable file

File diff suppressed because it is too large Load Diff

View File

@ -16,6 +16,11 @@ requires-python = ">=3.11"
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.version]
source = "regex"
path = "VERSION"
pattern = "^(?P<version>[0-9]+\\.[0-9]+\\.[0-9]+)$"
[tool.hatch.build.targets.wheel]
packages = ["."]

View File

@ -1,3 +1,4 @@
asif
aiohttp
pyyaml
loguru>=0.7.0