Compare commits

..

No commits in common. "96acbc127b6cc327bdc9dababa8a2fd6ccf3b3bd" and "252fdf4db105f98f0b5eae05c5fd0028e574e5fc" have entirely different histories.

11 changed files with 535 additions and 1214 deletions

View File

@ -7,25 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [1.2.0] - 2024-07-01
### Added
- New admin commands:
- !quiet - Disable song announcements while continuing to monitor the stream
- !unquiet - Re-enable song announcements
- Terminal commands for controlling announcement behavior:
- `quiet` - Disable song announcements via terminal
- `unquiet` - Enable song announcements via terminal
- Configuration option `quiet_on_start` to control whether the bot starts with announcements enabled or disabled
- Background task to monitor for quiet/unquiet requests from terminal commands
- Updated documentation in README.md with new commands
- Updated usage information in install.sh
### Changed
- Enhanced the RestartManager to handle quiet and unquiet commands
- Improved the BotManager class to support the new terminal commands
- Updated configuration example to include the new quiet_on_start option
## [1.0.1] - 2024-02-24 ## [1.0.1] - 2024-02-24
### Added ### Added
@ -75,7 +56,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Multi-bot support - Multi-bot support
- Configuration via YAML files - Configuration via YAML files
[Unreleased]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.2.0...HEAD [Unreleased]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.1...HEAD
[1.2.0]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.1...v1.2.0
[1.0.1]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.0...v1.0.1 [1.0.1]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.0...v1.0.1
[1.0.0]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/releases/tag/v1.0.0 [1.0.0]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/releases/tag/v1.0.0

View File

@ -8,14 +8,14 @@ Note: This is a work in progress. It has only been tested on **Python 3.12.6**.
## Features ## Features
- Monitors Icecast streams for metadata changes - Monitors Icecast stream metadata and announces track changes
- Announces new tracks in IRC channels - Configurable via YAML files and command line arguments
- Supports multiple IRC networks and channels - Supports running multiple bot instances simultaneously
- Customizable announcement formats - Pattern-based song title filtering
- Command system with admin privileges - Configurable logging levels and output
- Automatic reconnection on network issues - Smart URL resolution for metadata fetching
- Multiple bot instances can be managed together - Automatic reconnection and error recovery with status reporting
- Systemd service integration - Admin commands with permission system
## Dependencies ## Dependencies
@ -37,21 +37,21 @@ Create a YAML config file (default: `config.yaml`):
```yaml ```yaml
irc: irc:
host: "irc.libera.chat" host: "irc.example.net"
port: 6667 port: 6667
nick: "IcecastBot" nick: "MusicBot"
user: "icecastbot" user: "musicbot"
realname: "Icecast IRC Bot" realname: "Music Announcer Bot"
channel: "#yourchannel" channel: "#music"
stream: stream:
url: "https://your.stream.url" # Base URL without /stream or .mp3 url: "https://stream.example.com"
endpoint: "/stream" # The endpoint part (e.g. /stream, /radio.mp3) endpoint: "stream"
health_check_interval: 300 # How often to check health status (in seconds) health_check_interval: 300
announce: announce:
format: "\x02Now playing:\x02 {song}" # Format for song announcements format: "\x02Now playing:\x02 {song}"
ignore_patterns: # Don't announce songs matching these patterns ignore_patterns:
- "Unknown" - "Unknown"
- "Unable to fetch metadata" - "Unable to fetch metadata"
- "Error fetching metadata" - "Error fetching metadata"
@ -61,31 +61,16 @@ commands:
require_nick_prefix: false # If true, commands must be prefixed with "botname: " or "botname, " require_nick_prefix: false # If true, commands must be prefixed with "botname: " or "botname, "
allow_private_commands: false # If true, allows commands in private messages allow_private_commands: false # If true, allows commands in private messages
help: # Help message templates
specific_format: "\x02{prefix}{cmd}\x02: {desc}" # Format for specific command help
list_format: "(\x02{cmd}\x02, {desc})" # Format for commands in list
list_separator: " | " # Separator between commands in list
admin: admin:
users: # List of users who can use admin commands (use "*" for anyone) users: # List of users who can use admin commands (use "*" for anyone)
- "*" - "*"
logging:
level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL
``` ```
## Usage ## Usage
## Manager Commands
The bot manager supports the following commands:
```bash
icecast-irc-bot-manager list # List running bots
icecast-irc-bot-manager start --config FILE # Start a bot
icecast-irc-bot-manager stop BOT_ID # Stop a bot
icecast-irc-bot-manager restart BOT_ID # Restart a bot
icecast-irc-bot-manager quiet BOT_ID # Disable song announcements
icecast-irc-bot-manager unquiet BOT_ID # Enable song announcements
```
### Running with Automatic Restart Support ### Running with Automatic Restart Support
The recommended way to run the bot is using the provided `bot.sh` script, which handles automatic restarts when using the `!restart` command: The recommended way to run the bot is using the provided `bot.sh` script, which handles automatic restarts when using the `!restart` command:
@ -134,22 +119,35 @@ Run multiple instances with different configs:
python main.py config1.yaml config2.yaml config3.yaml python main.py config1.yaml config2.yaml config3.yaml
``` ```
## Commands ## IRC Commands
The bot supports the following commands: Regular commands:
- `!np`: Shows the currently playing track
- `!help`: Shows available commands
- `!np` - Show the currently playing song Admin commands:
- `!help` - Show available commands or help for a specific command - `!start`: Start stream monitoring
- `!stop`: Stop stream monitoring
- `!reconnect`: Reconnect to stream (with status feedback)
- `!restart`: Restart the bot (requires using bot.sh)
- `!quit`: Shutdown the bot
Admin commands (only available to users listed in the `admin.users` config): ## Logging
- `!start` - Start stream monitoring The bot supports different logging levels configurable in the config.yaml:
- `!stop` - Stop stream monitoring - DEBUG: Detailed information for troubleshooting
- `!quiet` - Disable song announcements but continue monitoring - INFO: General operational messages (default)
- `!unquiet` - Enable song announcements - WARNING: Warning messages and potential issues
- `!reconnect` - Reconnect to the stream - ERROR: Error messages only
- `!restart` - Restart the bot - CRITICAL: Critical failures only
- `!quit` - Shutdown the bot
Logs include:
- Stream health status
- Command processing
- Connection status
- Error details
The bot also maintains an ERROR.log file for critical issues.
## Error Handling ## Error Handling

View File

@ -1 +1 @@
1.2.0 1.1.0

View File

@ -17,7 +17,6 @@ announce:
- "Unknown" - "Unknown"
- "Unable to fetch metadata" - "Unable to fetch metadata"
- "Error fetching metadata" - "Error fetching metadata"
quiet_on_start: false # If true, bot starts in quiet mode (no announcements)
commands: commands:
prefix: "!" # Command prefix (e.g. !np, !help) prefix: "!" # Command prefix (e.g. !np, !help)
@ -31,4 +30,10 @@ help: # Help message templates
admin: admin:
users: # List of users who can use admin commands (use "*" for anyone) users: # List of users who can use admin commands (use "*" for anyone)
- "*" - "*"
logging:
level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL
format: "%(asctime)s - %(levelname)s - %(message)s" # Format for console logs
error_format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" # Format for error logs
datefmt: "%H:%M:%S" # Date/time format for log timestamps

View File

@ -3,368 +3,93 @@
import argparse import argparse
import asyncio import asyncio
import json import json
import logging
import os import os
import signal import signal
import sys import sys
import tempfile import tempfile
import time
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
# ANSI color codes for terminal output if needed # Configure logging
class Colors: logging.basicConfig(
COLORS = [ level=logging.INFO,
'\033[94m', # BLUE format='%(asctime)s - %(levelname)s - %(message)s',
'\033[92m', # GREEN datefmt='%Y-%m-%d %H:%M:%S'
'\033[95m', # MAGENTA )
'\033[93m', # YELLOW logger = logging.getLogger('icecast-irc-bot-manager')
'\033[96m', # CYAN
'\033[91m', # RED
'\033[38;5;208m', # ORANGE
'\033[38;5;165m', # PURPLE
'\033[38;5;39m', # DEEP BLUE
'\033[38;5;82m', # LIME
]
ENDC = '\033[0m'
BOLD = '\033[1m'
# Additional colors for output
GREY = '\033[37m'
WHITE = '\033[97m'
RED = '\033[91m'
ORANGE = '\033[38;5;208m' # Using 256-color code for orange
CYAN = '\033[96m'
MAGENTA = '\033[95m'
class BotManager: class BotManager:
"""Manages multiple Icecast IRC bot instances.""" """Manages multiple Icecast IRC bot instances."""
def __init__(self): def __init__(self):
self.bots: Dict[str, asyncio.subprocess.Process] = {} self.bots: Dict[str, asyncio.subprocess.Process] = {}
self.config_dir = Path('.') # Use current directory instead of /etc/icecast-irc-bot self.config_dir = Path('/etc/icecast-irc-bot')
self.socket_dir = Path(tempfile.gettempdir()) self.socket_dir = Path(tempfile.gettempdir())
self.state_file = Path(tempfile.gettempdir()) / 'icecast-irc-bots.json'
self.monitor_task = None
self.venv_python = os.getenv('VIRTUAL_ENV', '/opt/icecast-irc-bot/venv') + '/bin/python3'
# If venv_python doesn't exist, use the current Python interpreter
if not os.path.exists(self.venv_python):
self.venv_python = sys.executable
def _save_state(self):
"""Save the current state of running bots."""
try:
# Load existing state
existing_state = self._load_state()
# Update with current bots
current_state = {
bot_id: {
'pid': process.pid,
'config': str(self.config_dir / f"{bot_id}.yaml")
}
for bot_id, process in self.bots.items()
}
# Merge states, with current bots taking precedence
merged_state = {**existing_state, **current_state}
# Save merged state
with open(self.state_file, 'w') as f:
json.dump(merged_state, f)
print(f"Saved state with {len(merged_state)} bots")
except Exception as e:
print(f"Error saving state: {e}")
pass
def _load_state(self):
"""Load the state of running bots."""
try:
if self.state_file.exists():
with open(self.state_file, 'r') as f:
state = json.load(f)
return state
return {}
except Exception:
return {}
async def monitor_processes(self):
"""Monitor running bot processes and clean up dead ones."""
while True:
try:
for bot_id in list(self.bots.keys()):
process = self.bots[bot_id]
try:
# Check if process exists in system
os.kill(process.pid, 0)
# Check if process has terminated
if process.returncode is not None:
await self._cleanup_process(process)
del self.bots[bot_id]
except ProcessLookupError:
await self._cleanup_process(process)
del self.bots[bot_id]
except Exception:
pass
except Exception:
pass
await asyncio.sleep(5) # Check every 5 seconds
async def _cleanup_process(self, process: asyncio.subprocess.Process):
"""Clean up a process and its resources.
Args:
process: The process to clean up
"""
try:
print(f"Cleaning up process with PID {process.pid}")
# Wait for the process to finish if it hasn't
if process.returncode is None:
try:
print(f"Terminating process {process.pid}")
process.terminate()
await asyncio.wait_for(process.wait(), timeout=5.0)
except (asyncio.TimeoutError, ProcessLookupError) as e:
print(f"Error terminating process: {e}")
try:
print(f"Killing process {process.pid}")
process.kill()
await process.wait()
except ProcessLookupError as e:
print(f"Process already gone: {e}")
pass
# Drain any remaining output to prevent deadlocks
if process.stdout:
try:
data = await process.stdout.read()
if data:
print(f"Remaining stdout: {data.decode().strip()}")
except (ValueError, IOError) as e:
print(f"Error reading stdout: {e}")
pass
if process.stderr:
try:
data = await process.stderr.read()
if data:
print(f"Remaining stderr: {data.decode().strip()}")
except (ValueError, IOError) as e:
print(f"Error reading stderr: {e}")
pass
except Exception as e:
print(f"Exception during cleanup: {e}")
pass
async def start_bot(self, config_path: Path) -> bool: async def start_bot(self, config_path: Path) -> bool:
"""Start a new bot instance with the given config file. """Start a bot instance with the given config.
Args: Args:
config_path: Path to the config file config_path: Path to the bot's config file
Returns: Returns:
bool: True if bot was started successfully bool: True if bot was started successfully
""" """
try: try:
# Generate a unique ID for this bot based on the config file name # Create unique name for this bot instance
bot_id = config_path.stem bot_id = config_path.stem
# Check if a bot with this ID is already running # Check if bot is already running
state = self._load_state() if bot_id in self.bots:
if bot_id in state: logger.warning(f"Bot {bot_id} is already running")
try: return False
pid = state[bot_id]['pid']
os.kill(pid, 0) # Check if process exists
return False
except ProcessLookupError:
# Process doesn't exist, remove from state
pass
# Start the bot process using venv Python # Start the bot process
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
self.venv_python, 'main.py', sys.executable, '-m', 'main',
'--config', str(config_path), '--config', str(config_path),
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE stderr=asyncio.subprocess.PIPE
) )
# Verify process started successfully self.bots[bot_id] = process
try: logger.info(f"Started bot {bot_id} (PID: {process.pid})")
os.kill(process.pid, 0) return True
except ProcessLookupError:
await self._cleanup_process(process)
return False
# Start the monitor task if not already running except Exception as e:
if self.monitor_task is None: logger.error(f"Failed to start bot with config {config_path}: {e}")
self.monitor_task = asyncio.create_task(self.monitor_processes())
# Monitor the process output for a short time to ensure it starts properly
try:
startup_timeout = 5.0 # Give it 5 seconds to start
start_time = time.time()
success = False
while time.time() - start_time < startup_timeout:
# Check if process has exited
if process.returncode is not None:
await self._cleanup_process(process)
return False
# Read any available output during startup
try:
line = await asyncio.wait_for(process.stderr.readline(), timeout=0.1)
if line:
line = line.decode().strip()
print(f"Bot output: {line}")
# Consider the bot started successfully after a short delay
# instead of looking for specific output
success = True
except asyncio.TimeoutError:
# No output available, continue monitoring
continue
except Exception as e:
print(f"Error reading bot output: {e}")
pass
# Consider the bot started successfully if it's still running after the timeout
if not success and process.returncode is None:
success = True
# Start background task to monitor process output
async def monitor_output(process, bot_id):
try:
while True:
if process.returncode is not None:
break
try:
line = await process.stderr.readline()
if not line:
break
# Print the bot's output
print(f"Bot {bot_id} output: {line.decode().strip()}")
except Exception as e:
print(f"Error monitoring bot {bot_id}: {e}")
break
except Exception as e:
print(f"Monitor task exception for bot {bot_id}: {e}")
pass
asyncio.create_task(monitor_output(process, bot_id))
# Store the process
self.bots[bot_id] = process
self._save_state()
return True
except Exception:
await self._cleanup_process(process)
return False
except Exception:
return False return False
async def stop_bot(self, bot_id: str) -> bool: async def stop_bot(self, bot_id: str) -> bool:
"""Stop a running bot instance. """Stop a running bot instance.
Args:
bot_id: ID of the bot to stop, or "all" to stop all bots
Returns:
bool: True if bot(s) were stopped successfully
"""
if bot_id == "all":
print("Stopping all bots...")
success = True
state = self._load_state()
for bid in list(state.keys()):
print(f"Stopping bot {bid}...")
if not await self._stop_single_bot(bid):
success = False
return success
# Stop a single bot
return await self._stop_single_bot(bot_id)
async def _stop_single_bot(self, bot_id: str) -> bool:
"""Stop a single bot instance.
Args: Args:
bot_id: ID of the bot to stop bot_id: ID of the bot to stop
Returns: Returns:
bool: True if bot was stopped successfully bool: True if bot was stopped successfully
""" """
# Check both local bots and state file if bot_id not in self.bots:
state = self._load_state() logger.warning(f"Bot {bot_id} is not running")
process = None
if bot_id in self.bots:
process = self.bots[bot_id]
elif bot_id in state:
try:
pid = state[bot_id]['pid']
print(f"Stopping bot {bot_id} with PID {pid}")
# Try to kill the process
try:
# First try a gentle termination
os.kill(pid, signal.SIGTERM)
print(f"Sent SIGTERM to process {pid}")
# Wait a bit for the process to terminate
for i in range(50): # 5 seconds
await asyncio.sleep(0.1)
try:
os.kill(pid, 0) # Check if process exists
except ProcessLookupError:
print(f"Process {pid} terminated successfully")
break
else:
# Process didn't terminate, force kill
print(f"Process {pid} didn't terminate, sending SIGKILL")
try:
os.kill(pid, signal.SIGKILL)
print(f"Sent SIGKILL to process {pid}")
except ProcessLookupError:
print(f"Process {pid} already terminated")
pass
except ProcessLookupError:
print(f"Process {pid} not found")
pass
# Remove only this bot from state
if bot_id in state:
print(f"Removing {bot_id} from state file")
del state[bot_id]
with open(self.state_file, 'w') as f:
json.dump(state, f)
print(f"State file updated, remaining bots: {list(state.keys())}")
return True
except Exception as e:
print(f"Error stopping bot {bot_id}: {e}")
return False
else:
print(f"Bot {bot_id} not found")
return False return False
if process: try:
process = self.bots[bot_id]
process.terminate()
try: try:
print(f"Cleaning up process for bot {bot_id}") await asyncio.wait_for(process.wait(), timeout=5.0)
await self._cleanup_process(process) except asyncio.TimeoutError:
del self.bots[bot_id] process.kill()
await process.wait()
# Update state file - only remove this bot
state = self._load_state() del self.bots[bot_id]
if bot_id in state: logger.info(f"Stopped bot {bot_id}")
print(f"Removing {bot_id} from state file") return True
del state[bot_id]
with open(self.state_file, 'w') as f: except Exception as e:
json.dump(state, f) logger.error(f"Failed to stop bot {bot_id}: {e}")
print(f"State file updated, remaining bots: {list(state.keys())}") return False
return True
except Exception as e:
print(f"Error cleaning up process for bot {bot_id}: {e}")
return False
async def restart_bot(self, bot_id: str) -> bool: async def restart_bot(self, bot_id: str) -> bool:
"""Restart a running bot instance. """Restart a running bot instance.
@ -378,6 +103,7 @@ class BotManager:
# Find the config file for this bot # Find the config file for this bot
config_path = self.config_dir / f"{bot_id}.yaml" config_path = self.config_dir / f"{bot_id}.yaml"
if not config_path.exists(): if not config_path.exists():
logger.error(f"Config file not found for bot {bot_id}")
return False return False
# Stop the bot if it's running # Stop the bot if it's running
@ -388,267 +114,81 @@ class BotManager:
# Start the bot with the same config # Start the bot with the same config
return await self.start_bot(config_path) return await self.start_bot(config_path)
async def quiet_bot(self, bot_id: str) -> bool: async def list_bots(self) -> List[Dict]:
"""Disable song announcements for a running bot. """List all running bot instances.
Args:
bot_id: ID of the bot to quiet
Returns:
bool: True if command was sent successfully
"""
# Check if bot exists in state
state = self._load_state()
if bot_id not in state:
print(f"Bot {bot_id} not found")
return False
try:
# Get the socket path for this bot
socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
if not socket_path.exists():
print(f"Socket for bot {bot_id} not found at {socket_path}")
return False
# Send quiet command
print(f"Sending quiet command to bot {bot_id}")
reader, writer = await asyncio.open_unix_connection(str(socket_path))
writer.write(b'quiet')
await writer.drain()
writer.close()
await writer.wait_closed()
print(f"Quiet command sent to bot {bot_id}")
return True
except Exception as e:
print(f"Error sending quiet command to bot {bot_id}: {e}")
return False
async def unquiet_bot(self, bot_id: str) -> bool:
"""Enable song announcements for a running bot.
Args:
bot_id: ID of the bot to unquiet
Returns:
bool: True if command was sent successfully
"""
# Check if bot exists in state
state = self._load_state()
if bot_id not in state:
print(f"Bot {bot_id} not found")
return False
try:
# Get the socket path for this bot
socket_path = Path(tempfile.gettempdir()) / f"icecast_bot_{bot_id}.sock"
if not socket_path.exists():
print(f"Socket for bot {bot_id} not found at {socket_path}")
return False
# Send unquiet command
print(f"Sending unquiet command to bot {bot_id}")
reader, writer = await asyncio.open_unix_connection(str(socket_path))
writer.write(b'unquiet')
await writer.drain()
writer.close()
await writer.wait_closed()
print(f"Unquiet command sent to bot {bot_id}")
return True
except Exception as e:
print(f"Error sending unquiet command to bot {bot_id}: {e}")
return False
async def list_bots(self) -> bool:
"""List all running bots.
Returns: Returns:
bool: True if any bots are running List[Dict]: List of bot info dictionaries
""" """
state = self._load_state() bot_info = []
for bot_id, process in self.bots.items():
# Check if any bots are running info = {
if not state: 'id': bot_id,
print("No bots running") 'pid': process.pid,
return False 'running': process.returncode is None
}
# Track unique PIDs to avoid duplicates bot_info.append(info)
seen_pids = set() return bot_info
unique_bots = {}
# Filter out duplicates based on PID
for bot_id, info in state.items():
pid = info.get('pid')
if pid and pid not in seen_pids:
seen_pids.add(pid)
unique_bots[bot_id] = info
if not unique_bots:
print("No bots running")
return False
# Print header
print("\nRunning Bots:")
print("-" * 80)
print(f"{'ID':<20} {'PID':<8} {'Status':<10} {'Command':<40}")
print("-" * 80)
# Print each bot's status
for bot_id, info in unique_bots.items():
pid = info.get('pid')
config = info.get('config', '')
# Check if process is still running
try:
os.kill(pid, 0)
status = "running"
except ProcessLookupError:
status = "stopped"
continue # Skip stopped processes
except Exception:
status = "unknown"
# Get command line
try:
with open(f"/proc/{pid}/cmdline", 'rb') as f:
cmdline = f.read().replace(b'\0', b' ').decode()
except Exception:
cmdline = f"Unknown (PID: {pid})"
print(f"{bot_id:<20} {pid:<8} {status:<10} {cmdline:<40}")
print("-" * 80)
print()
return True
async def cleanup(self): async def cleanup(self):
"""Clean up all running bots.""" """Clean up all running bots."""
if self.monitor_task: for bot_id in list(self.bots.keys()):
self.monitor_task.cancel() await self.stop_bot(bot_id)
try:
await self.monitor_task
except asyncio.CancelledError:
pass
await self.stop_bot("all")
async def main(): async def main():
parser = argparse.ArgumentParser(description='Icecast IRC Bot Manager') parser = argparse.ArgumentParser(description='Icecast IRC Bot Manager')
parser.add_argument('--config', type=str, help='Path to config file or directory') parser.add_argument('--config', type=str, help='Path to config file or directory')
parser.add_argument('command', choices=['start', 'stop', 'restart', 'list', 'quiet', 'unquiet'], help='Command to execute') parser.add_argument('command', choices=['start', 'stop', 'restart', 'list'], help='Command to execute')
parser.add_argument('bot_id', nargs='?', help='Bot ID for start/stop/restart/quiet/unquiet commands, or "all" to stop all bots') parser.add_argument('bot_id', nargs='?', help='Bot ID for start/stop/restart commands')
args = parser.parse_args() args = parser.parse_args()
manager = BotManager() manager = BotManager()
should_cleanup = False # Only cleanup for certain commands
try: try:
if args.command == 'list': if args.command == 'list':
if await manager.list_bots(): bot_info = await manager.list_bots()
# If list_bots returns True, we've printed the list if bot_info:
pass print(json.dumps(bot_info, indent=2))
else:
print("No bots running")
elif args.command == 'start': elif args.command == 'start':
should_cleanup = True # Need cleanup for start command
if not args.config: if not args.config:
print("Error: --config required for start command") print("Error: --config required for start command")
sys.exit(1) sys.exit(1)
config_path = Path(args.config) config_path = Path(args.config)
if config_path.is_dir(): if config_path.is_dir():
# Start all bots in directory # Start all bots in directory
success = True
for config_file in config_path.glob('*.yaml'): for config_file in config_path.glob('*.yaml'):
if not await manager.start_bot(config_file): await manager.start_bot(config_file)
success = False
if not success:
sys.exit(1)
else: else:
# Start single bot # Start single bot
if not await manager.start_bot(config_path): await manager.start_bot(config_path)
sys.exit(1)
# If we started any bots successfully, keep running until interrupted
if manager.bots:
try:
# Keep the manager running
while True:
await asyncio.sleep(1)
if not manager.bots:
break
except asyncio.CancelledError:
pass
elif args.command == 'stop': elif args.command == 'stop':
# Don't need cleanup for stop command as it already cleans up
should_cleanup = False
if not args.bot_id: if not args.bot_id:
print("Error: bot_id required for stop command (use 'all' to stop all bots)") print("Error: bot_id required for stop command")
sys.exit(1)
if not await manager.stop_bot(args.bot_id):
sys.exit(1) sys.exit(1)
await manager.stop_bot(args.bot_id)
elif args.command == 'restart': elif args.command == 'restart':
should_cleanup = True # Need cleanup for restart command
if not args.bot_id: if not args.bot_id:
print("Error: bot_id required for restart command") print("Error: bot_id required for restart command")
sys.exit(1) sys.exit(1)
if args.bot_id == "all": await manager.restart_bot(args.bot_id)
print("Error: restart all is not supported")
sys.exit(1)
if not await manager.restart_bot(args.bot_id):
sys.exit(1)
# If we restarted successfully, keep running until interrupted
if manager.bots:
try:
# Keep the manager running
while True:
await asyncio.sleep(1)
if not manager.bots:
break
except asyncio.CancelledError:
pass
elif args.command == 'quiet':
should_cleanup = False # Don't need cleanup for quiet command
if not args.bot_id:
print("Error: bot_id required for quiet command")
sys.exit(1)
if args.bot_id == "all":
print("Error: quiet all is not supported")
sys.exit(1)
if not await manager.quiet_bot(args.bot_id):
sys.exit(1)
elif args.command == 'unquiet':
should_cleanup = False # Don't need cleanup for unquiet command
if not args.bot_id:
print("Error: bot_id required for unquiet command")
sys.exit(1)
if args.bot_id == "all":
print("Error: unquiet all is not supported")
sys.exit(1)
if not await manager.unquiet_bot(args.bot_id):
sys.exit(1)
except KeyboardInterrupt: except KeyboardInterrupt:
should_cleanup = True # Need cleanup for keyboard interrupt logger.info("Shutting down...")
except Exception: await manager.cleanup()
should_cleanup = True # Need cleanup for errors except Exception as e:
logger.error(f"Unhandled error: {e}")
sys.exit(1) sys.exit(1)
finally:
# Only clean up if we need to
if should_cleanup:
await manager.cleanup()
if __name__ == '__main__': if __name__ == '__main__':
# Set up signal handlers for graceful shutdown # Set up signal handlers
def signal_handler(sig, frame):
# Don't exit immediately, let the cleanup happen
asyncio.get_event_loop().stop()
for sig in (signal.SIGTERM, signal.SIGINT): for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, signal_handler) signal.signal(sig, lambda s, f: sys.exit(0))
# Run the manager # Run the manager
asyncio.run(main()) asyncio.run(main())

View File

@ -13,8 +13,8 @@ Environment=PATH=/opt/icecast-irc-bot/venv/bin:$PATH
ExecStart=/opt/icecast-irc-bot/venv/bin/python3 /usr/local/bin/icecast-irc-bot-manager --config /etc/icecast-irc-bot/config.yaml ExecStart=/opt/icecast-irc-bot/venv/bin/python3 /usr/local/bin/icecast-irc-bot-manager --config /etc/icecast-irc-bot/config.yaml
Restart=on-failure Restart=on-failure
RestartSec=5s RestartSec=5s
StandardOutput=journal StandardOutput=append:/var/log/icecast-irc-bot/bot.log
StandardError=journal StandardError=append:/var/log/icecast-irc-bot/error.log
# Security settings # Security settings
NoNewPrivileges=yes NoNewPrivileges=yes

View File

@ -68,6 +68,4 @@ echo "Usage:"
echo "icecast-irc-bot-manager list # List running bots" echo "icecast-irc-bot-manager list # List running bots"
echo "icecast-irc-bot-manager start --config FILE # Start a bot" echo "icecast-irc-bot-manager start --config FILE # Start a bot"
echo "icecast-irc-bot-manager stop BOT_ID # Stop a bot" echo "icecast-irc-bot-manager stop BOT_ID # Stop a bot"
echo "icecast-irc-bot-manager restart BOT_ID # Restart a bot" echo "icecast-irc-bot-manager restart BOT_ID # Restart a bot"
echo "icecast-irc-bot-manager quiet BOT_ID # Disable song announcements"
echo "icecast-irc-bot-manager unquiet BOT_ID # Enable song announcements"

121
logger.py
View File

@ -1,121 +0,0 @@
#!/usr/bin/env python3
"""
Logger module for the Icecast metadata IRC announcer.
Uses Loguru for simple yet powerful logging.
"""
import os
import sys
from pathlib import Path
from loguru import logger
class LogManager:
"""
Manages logging configuration for the Icecast metadata IRC announcer.
This class provides a simple interface for configuring and using Loguru
throughout the project. It handles log file rotation, formatting, and
different log levels.
"""
def __init__(self, config=None):
"""
Initialize the LogManager with optional configuration.
Args:
config: Optional dictionary with logging configuration.
If None, default configuration is used.
"""
self.config = config or {}
self.log_dir = Path(self.config.get('log_dir', 'logs'))
self.log_level = self.config.get('level', 'INFO').upper()
self.log_format = self.config.get('format',
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
# Create log directory if it doesn't exist
if not self.log_dir.exists():
self.log_dir.mkdir(parents=True, exist_ok=True)
# Configure logger
self._configure_logger()
def _configure_logger(self):
"""Configure Loguru logger with appropriate sinks and formats."""
# Remove default handler
logger.remove()
# Add console handler
logger.add(
sys.stderr,
format=self.log_format,
level=self.log_level,
colorize=True
)
# Add file handler with rotation
log_file = self.log_dir / "icecast_bot.log"
logger.add(
str(log_file),
format=self.log_format,
level=self.log_level,
rotation="10 MB", # Rotate when file reaches 10MB
compression="zip", # Compress rotated logs
retention="1 week", # Keep logs for 1 week
backtrace=True, # Include backtrace in error logs
diagnose=True # Include variables in error logs
)
def get_logger(self, name=None):
"""
Get a logger instance with the given name.
Args:
name: Optional name for the logger. If None, the calling module's name is used.
Returns:
A Loguru logger instance.
"""
if name:
return logger.bind(name=name)
return logger
@staticmethod
def set_level(level):
"""
Set the log level for all handlers.
Args:
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
for handler_id in logger._core.handlers:
logger.configure(handler_id, level=level)
# Create a default instance for easy import
log_manager = LogManager()
get_logger = log_manager.get_logger
# Export the logger directly for simple usage
log = logger
# For convenience, export common log levels
debug = logger.debug
info = logger.info
warning = logger.warning
error = logger.error
critical = logger.critical
exception = logger.exception
# Example usage:
# from logger import log, debug, info, error
# log.info("This is an info message")
# debug("This is a debug message")
#
# # Or with a named logger:
# from logger import get_logger
# logger = get_logger("my_module")
# logger.info("This is a message from my_module")

877
main.py Executable file → Normal file

File diff suppressed because it is too large Load Diff

View File

@ -16,11 +16,6 @@ requires-python = ">=3.11"
requires = ["hatchling"] requires = ["hatchling"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
[tool.hatch.version]
source = "regex"
path = "VERSION"
pattern = "^(?P<version>[0-9]+\\.[0-9]+\\.[0-9]+)$"
[tool.hatch.build.targets.wheel] [tool.hatch.build.targets.wheel]
packages = ["."] packages = ["."]

View File

@ -1,4 +1,3 @@
asif asif
aiohttp aiohttp
pyyaml pyyaml
loguru>=0.7.0