Compare commits

...

2 Commits

Author SHA1 Message Date
6817437380
chore: prepare release 1.0.1 2025-02-24 11:17:23 -08:00
3cd3e98094
Release version 1.0.0 2025-02-24 11:08:25 -08:00
6 changed files with 727 additions and 129 deletions

61
CHANGELOG.md Normal file
View File

@ -0,0 +1,61 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [1.0.1] - 2024-02-24
### Added
- Configurable logging levels via config.yaml
- Smart URL resolution for metadata fetching
- Connection status verification and reporting
- Admin command system with permissions
- Automatic restart functionality via bot.sh
- Enhanced command handling with prefix configuration
- Private message command control
- New admin commands:
- !start - Start stream monitoring
- !stop - Stop stream monitoring
- !reconnect - Reconnect to stream with status feedback
- !restart - Restart the bot (requires bot.sh)
- !quit - Shutdown the bot
- !help command showing available commands based on user permissions
- ERROR.log file for critical issues
- Detailed debug logging for troubleshooting
### Changed
- Improved metadata fetching with multiple URL patterns
- Enhanced error handling and reporting
- Better stream connection status feedback
- More informative health check messages
- Cleaner logging output with configurable verbosity
- Updated documentation with new features and configuration options
### Fixed
- Metadata fetching issues with different URL patterns
- Command handling in channels vs private messages
- Stream reconnection verification
- Error reporting and logging clarity
- Configuration file structure and validation
### Other
- Added version information
## [1.0.0] - 2024-02-23
### Added
- Initial release
- Basic Icecast stream monitoring
- IRC channel announcements
- Simple command system (!np)
- Basic error handling and reconnection
- Multi-bot support
- Configuration via YAML files
[Unreleased]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.1...HEAD
[1.0.1]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/compare/v1.0.0...v1.0.1
[1.0.0]: https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/releases/tag/v1.0.0

View File

@ -1,5 +1,7 @@
# Icecast-metadata-IRC-announcer
[![Version](https://img.shields.io/badge/version-1.0.1-blue.svg)](https://code.cottongin.xyz/cottongin/Icecast-metadata-IRC-announcer/releases/tag/v1.0.1)
A simple asynchronous Python bot that monitors an Icecast stream and announces track changes to an IRC channel. Supports running multiple instances with different configurations.
Note: This is a work in progress. It has only been tested on **Python 3.12.6**.
@ -10,8 +12,10 @@ Note: This is a work in progress. It has only been tested on **Python 3.12.6**.
- Configurable via YAML files and command line arguments
- Supports running multiple bot instances simultaneously
- Pattern-based song title filtering
- Responds to !np commands in IRC channels
- Automatic reconnection and error recovery
- Configurable logging levels and output
- Smart URL resolution for metadata fetching
- Automatic reconnection and error recovery with status reporting
- Admin commands with permission system
## Dependencies
@ -50,12 +54,43 @@ announce:
ignore_patterns:
- "Unknown"
- "Unable to fetch metadata"
# Add more patterns to ignore
- "Error fetching metadata"
commands:
prefix: "!" # Command prefix (e.g. !np, !help)
require_nick_prefix: false # If true, commands must be prefixed with "botname: " or "botname, "
allow_private_commands: false # If true, allows commands in private messages
admin:
users: # List of users who can use admin commands (use "*" for anyone)
- "*"
logging:
level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL
```
## Usage
### Single Bot Mode
### Running with Automatic Restart Support
The recommended way to run the bot is using the provided `bot.sh` script, which handles automatic restarts when using the `!restart` command:
```bash
# Make the script executable
chmod +x bot.sh
# Run a single bot
./bot.sh config.yaml
# Run multiple bots
./bot.sh config1.yaml config2.yaml config3.yaml
```
### Manual Running
You can also run the bot directly with Python:
#### Single Bot Mode
Run with config file:
```bash
@ -75,32 +110,51 @@ Available command line arguments:
- `--irc-channel`: IRC channel to join
- `--stream-url`: Icecast base URL
- `--stream-endpoint`: Stream endpoint
- `--cmd-prefix`: Command prefix character(s)
### Multiple Bot Mode
#### Multiple Bot Mode
Run multiple instances with different configs:
```bash
python main.py config1.yaml config2.yaml config3.yaml
```
Or use the launcher script:
```bash
python launch.py
```
## IRC Commands
Regular commands:
- `!np`: Shows the currently playing track
- `!help`: Shows available commands
Admin commands:
- `!start`: Start stream monitoring
- `!stop`: Stop stream monitoring
- `!reconnect`: Reconnect to stream (with status feedback)
- `!restart`: Restart the bot (requires using bot.sh)
- `!quit`: Shutdown the bot
## Logging
The bot logs important events to stdout with timestamps. Log level is set to INFO by default.
The bot supports different logging levels configurable in the config.yaml:
- DEBUG: Detailed information for troubleshooting
- INFO: General operational messages (default)
- WARNING: Warning messages and potential issues
- ERROR: Error messages only
- CRITICAL: Critical failures only
Logs include:
- Stream health status
- Command processing
- Connection status
- Error details
The bot also maintains an ERROR.log file for critical issues.
## Error Handling
- Automatically reconnects on connection drops
- Retries stream monitoring on errors
- Logs errors for debugging
- Smart metadata URL resolution
- Connection status verification and reporting
- Health checks every 5 minutes (configurable)
## License

1
VERSION Normal file
View File

@ -0,0 +1 @@
1.0.1

24
bot.sh Executable file
View File

@ -0,0 +1,24 @@
#!/bin/bash
# Check if any arguments were provided
if [ $# -eq 0 ]; then
echo "Usage: $0 <config.yaml> [additional configs...]"
exit 1
fi
while true; do
# Remove any existing restart flags before starting
rm -f .restart_flag_*
python main.py "$@"
# Check for any restart flags
if ls .restart_flag_* 1> /dev/null 2>&1; then
echo "Restart flag(s) found, restarting bot(s)..."
sleep 1
continue
else
echo "Bot(s) exited without restart flag, stopping..."
break
fi
done

View File

@ -1,19 +1,31 @@
irc:
host: "irc.someircserver.net"
port: 6667 # asif does not support ssl as of 2025-02-23
nick: "bot"
user: "bot"
realname: "Botty Bot - https://bot.site"
channel: "#channel"
host: "irc.libera.chat"
port: 6667
nick: "IcecastBot"
user: "icecastbot"
realname: "Icecast IRC Bot"
channel: "#yourchannel"
stream:
url: "http://your-icecast-server.com:8000/" # this is the *base* url of the icecast server
endpoint: "stream" # this is the endpoint of the icecast server (e.g. /stream or .mp3)
health_check_interval: 300
url: "https://your.stream.url" # Base URL without /stream or .mp3
endpoint: "/stream" # The endpoint part (e.g. /stream, /radio.mp3)
health_check_interval: 300 # How often to log health status (in seconds)
announce:
format: "\x02Now playing:\x02 {song}"
ignore_patterns:
format: "\x02Now playing:\x02 {song}" # Format for song announcements
ignore_patterns: # Don't announce songs matching these patterns
- "Unknown"
- "Unable to fetch metadata"
# Add more patterns to ignore here
- "Error fetching metadata"
commands:
prefix: "!" # Command prefix (e.g. !np, !help)
require_nick_prefix: false # If true, commands must be prefixed with "botname: " or "botname, "
allow_private_commands: false # If true, allows commands in private messages
admin:
users: # List of users who can use admin commands (use "*" for anyone)
- "*"
logging:
level: "INFO" # Logging level: DEBUG, INFO, WARNING, ERROR, or CRITICAL

506
main.py
View File

@ -2,6 +2,7 @@
# Set up logging configuration before imports
import logging
import os
class AsifFilter(logging.Filter):
def filter(self, record):
@ -10,29 +11,47 @@ class AsifFilter(logging.Filter):
record.name.startswith('asif') or
(isinstance(record.msg, str) and record.msg.startswith('Joined channel')))
# Defer logging setup until after config is loaded
def setup_logging(log_level: str = 'INFO'):
"""Set up logging with the specified level."""
# Convert string level to logging constant
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
# Set up base logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
level=numeric_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# Apply filter to root logger
logging.getLogger().addFilter(AsifFilter())
# Also apply to asif's logger specifically
# Configure asif's logger
asif_logger = logging.getLogger('asif')
asif_logger.addFilter(AsifFilter())
asif_logger.setLevel(logging.CRITICAL)
asif_logger.propagate = False
from asif import Client
# Set up error logging for asif
error_handler = logging.FileHandler('ERROR.log')
error_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# Patch asif's Client class to use error logger
from asif import Client
# Patch asif's Client class
def patch_client_bg(self, coro):
"""Patch for Client._bg to properly handle SystemExit"""
async def runner():
try:
await coro
except SystemExit:
raise # Re-raise SystemExit
except Exception:
self._log.exception("async: Coroutine raised exception")
return asyncio.ensure_future(runner())
# Patch asif's Client class to use error logger and patched _bg
def silent_client_init(self, *args, **kwargs):
# Create a logger that writes to ERROR.log
error_logger = logging.getLogger('asif.client')
@ -43,6 +62,9 @@ def silent_client_init(self, *args, **kwargs):
# Store the logger
self._log = error_logger
# Patch the _bg method
self._bg = patch_client_bg.__get__(self)
# Call the original __init__
original_init(self, *args, **kwargs)
@ -59,6 +81,7 @@ import argparse
import yaml
from pathlib import Path
from typing import List, Optional
import sys
# ANSI color codes
class Colors:
@ -142,7 +165,12 @@ class BotLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return f'[{self.colored_botname}] {msg}', kwargs
# Add flag file constant at the top level
RESTART_FLAG_FILE = ".restart_flag"
class IcecastBot:
VERSION = "1.0.1"
def __init__(self, config_path: Optional[str] = None):
# Load config
self.config = self.load_config(config_path)
@ -153,6 +181,8 @@ class IcecastBot:
{'botname': f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}'}
)
self.logger.info(f"Starting Icecast IRC Bot v{self.VERSION}")
# Initialize IRC bot with config
self.bot = Client(
host=self.config['irc']['host'],
@ -173,6 +203,21 @@ class IcecastBot:
self.last_health_check = time.time()
self.health_check_interval = self.config['stream']['health_check_interval']
# Get command settings from config
self.cmd_prefix = self.config.get('commands', {}).get('prefix', '!')
self.require_nick_prefix = self.config.get('commands', {}).get('require_nick_prefix', False)
self.allow_private_commands = self.config.get('commands', {}).get('allow_private_commands', False)
# Control flags
self.monitor_task = None
self.should_monitor = True
self.is_monitoring = False
self.admin_users = self.config.get('admin', {}).get('users', ['*']) # '*' means anyone can use admin commands
self.current_process = None # Track the current subprocess
# Create a unique restart flag file for this bot instance
self.restart_flag_file = f".restart_flag_{self.config['irc']['nick']}_{self.stream_endpoint}"
self.setup_handlers()
@staticmethod
@ -195,9 +240,14 @@ class IcecastBot:
'announce': {
'format': "\x02Now playing:\x02 {song}",
'ignore_patterns': ['Unknown', 'Unable to fetch metadata']
},
'logging': {
'level': 'INFO'
}
}
# Set up logging with configured level
setup_logging(config.get('logging', {}).get('level', 'INFO'))
return config
def should_announce_song(self, song: str) -> bool:
@ -213,7 +263,8 @@ class IcecastBot:
except Exception as e:
self.logger.error(f"Error joining channel: {e}")
asyncio.create_task(self.monitor_metadata())
if self.should_monitor:
await self.start_monitoring()
@self.bot.on_join()
async def on_join(channel):
@ -221,41 +272,339 @@ class IcecastBot:
if not self.channel:
self.channel = channel
@self.bot.on_message(re.compile("^!np"))
def create_command_pattern(cmd: str) -> re.Pattern:
"""Create a regex pattern for a command that handles nick prefixes."""
if self.require_nick_prefix:
pattern = f"^({self.config['irc']['nick']}[:,] )?{re.escape(self.cmd_prefix)}{cmd}($| .*$)"
else:
pattern = f"^{re.escape(self.cmd_prefix)}{cmd}($| .*$)"
self.logger.debug(f"Created command pattern for '{cmd}': {pattern}")
return re.compile(pattern)
# Global message handler for debugging
@self.bot.on_message()
async def debug_message(message):
"""Debug handler to log all messages"""
try:
# Log full message details only at DEBUG level
self.logger.debug(
f"Received message: "
f"recipient={getattr(message, 'recipient', None)!r}, "
f"sender={getattr(message, 'sender', None)!r}, "
f"text={getattr(message, 'text', None)!r}"
)
# Test each command pattern
msg = getattr(message, 'text', None)
if msg and isinstance(msg, str):
for cmd in ['np', 'help', 'restart', 'quit', 'reconnect', 'stop', 'start']:
pattern = create_command_pattern(cmd)
if pattern.match(msg):
self.logger.debug(f"Command matched: {cmd}")
except Exception as e:
self.logger.error(f"Error in debug message handler: {e}")
# Command handlers with additional debug logging
@self.bot.on_message(create_command_pattern('np'))
async def now_playing(message):
await message.reply(self.reply.format(song=self.current_song))
self.logger.debug(f"np handler called with message: {getattr(message, 'text', None)!r}")
recipient = getattr(message, 'recipient', None)
# Check if recipient is a Channel object
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
self.logger.debug(f"Recipient check - is_channel: {is_channel}, recipient: {recipient}")
if not self.allow_private_commands and not is_channel:
self.logger.debug("Ignoring private np command")
return
try:
if is_channel:
await recipient.message(self.reply.format(song=self.current_song))
self.logger.debug("Successfully sent np reply")
else:
self.logger.debug("Could not send np reply - invalid recipient")
except Exception as e:
self.logger.error(f"Error in np handler: {e}", exc_info=True)
@self.bot.on_message(create_command_pattern('help'))
async def help_command(message):
self.logger.debug(f"help handler called with message: {getattr(message, 'text', None)!r}")
recipient = getattr(message, 'recipient', None)
# Check if recipient is a Channel object
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
self.logger.debug(f"Recipient check - is_channel: {is_channel}, recipient: {recipient}")
if not self.allow_private_commands and not is_channel:
self.logger.debug("Ignoring private help command")
return
try:
prefix = self.cmd_prefix
help_text = (
f"Icecast IRC Bot v{self.VERSION}\n"
f"Available commands:\n"
f"{prefix}np - Show current song\n"
f"{prefix}help - Show this help message\n"
)
if self.is_admin(message.sender):
help_text += (
f"{prefix}start - Start stream monitoring\n"
f"{prefix}stop - Stop stream monitoring\n"
f"{prefix}reconnect - Reconnect to stream\n"
f"{prefix}restart - Restart the bot\n"
f"{prefix}quit - Shutdown the bot\n"
)
if is_channel:
await recipient.message(help_text)
self.logger.debug("Successfully sent help reply")
else:
self.logger.debug("Could not send help reply - invalid recipient")
except Exception as e:
self.logger.error(f"Error in help handler: {e}", exc_info=True)
@self.bot.on_message(create_command_pattern('restart'))
async def restart_bot(message):
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
if not self.is_admin(message.sender):
if is_channel:
await recipient.message("You don't have permission to use this command.")
return
self.logger.info(f"Restart command received from {message.sender}")
await self.stop_monitoring()
try:
await self.bot.quit("Restarting...")
with open(self.restart_flag_file, 'w') as f:
f.write('restart')
except Exception as e:
self.logger.error(f"Error during restart: {e}")
sys.exit(0)
@self.bot.on_message(create_command_pattern('quit'))
async def quit_bot(message):
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
if not self.is_admin(message.sender):
if is_channel:
await recipient.message("You don't have permission to use this command.")
return
self.logger.info(f"Quit command received from {message.sender}")
await self.stop_monitoring()
try:
await self.bot.quit("Shutting down...")
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
sys.exit(0)
@self.bot.on_message(create_command_pattern('reconnect'))
async def reconnect_stream(message):
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
if not self.is_admin(message.sender):
if is_channel:
await recipient.message("You don't have permission to use this command.")
return
self.logger.info(f"Reconnect command received from {message.sender}")
success = await self.restart_monitoring()
if not success and is_channel:
await recipient.message("Stream reconnection may have failed. Check logs for details.")
@self.bot.on_message(create_command_pattern('stop'))
async def stop_monitoring(message):
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
if not self.is_admin(message.sender):
if is_channel:
await recipient.message("You don't have permission to use this command.")
return
self.logger.info(f"Stop command received from {message.sender}")
await self.stop_monitoring()
if is_channel:
await recipient.message("Stream monitoring stopped.")
@self.bot.on_message(create_command_pattern('start'))
async def start_monitoring(message):
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
if not self.is_admin(message.sender):
if is_channel:
await recipient.message("You don't have permission to use this command.")
return
self.logger.info(f"Start command received from {message.sender}")
await self.start_monitoring()
if is_channel:
await recipient.message("Stream monitoring started.")
def is_admin(self, user: str) -> bool:
"""Check if a user has admin privileges.
Args:
user: Full IRC user string (nickname!username@hostname) or User object
Returns:
bool: True if user has admin privileges
"""
try:
if hasattr(user, 'name'):
nickname = user.name
else:
nickname = user.split('!')[0] if '!' in user else user
except Exception as e:
self.logger.error(f"Error extracting nickname: {e}")
return False
return '*' in self.admin_users or nickname in self.admin_users
async def start_monitoring(self):
"""Start the metadata monitoring task."""
if not self.is_monitoring:
self.should_monitor = True
self.monitor_task = asyncio.create_task(self.monitor_metadata())
self.is_monitoring = True
self.logger.info("Started metadata monitoring task")
async def stop_monitoring(self):
"""Stop the metadata monitoring task."""
self.should_monitor = False
self.is_monitoring = False
# First, terminate the subprocess if it exists
if self.current_process and self.current_process.returncode is None:
try:
self.current_process.terminate()
try:
await asyncio.wait_for(self.current_process.wait(), timeout=2.0)
except asyncio.TimeoutError:
self.current_process.kill()
await self.current_process.wait()
except Exception as e:
self.logger.error(f"Error terminating subprocess: {e}")
finally:
self.current_process = None
# Then cancel the monitoring task
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(f"Error cancelling monitor task: {e}")
finally:
self.monitor_task = None
self.logger.info("Stopped metadata monitoring task")
async def restart_monitoring(self):
"""Restart the metadata monitoring task and verify the reconnection."""
await self.stop_monitoring()
# Store the channel for status updates
notify_channel = self.channel
try:
await self.start_monitoring()
# Wait for up to 10 seconds to confirm data is being received
start_time = time.time()
while time.time() - start_time < 10:
if self.current_process and self.current_process.returncode is None:
# Try to fetch metadata to confirm connection
test_song = await self.fetch_json_metadata()
if test_song and "Unable to fetch metadata" not in test_song:
if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'):
await notify_channel.message("Stream reconnected successfully.")
return True
await asyncio.sleep(1)
# If we get here, the connection wasn't confirmed
if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'):
await notify_channel.message("Stream reconnection attempt completed, but status uncertain. Check logs for details.")
return False
except Exception as e:
self.logger.error(f"Error during stream reconnection: {e}")
if notify_channel and hasattr(notify_channel, 'name') and notify_channel.name.startswith('#'):
await notify_channel.message(f"Failed to reconnect to stream: {str(e)}")
return False
async def fetch_json_metadata(self):
"""Fetch metadata from the Icecast JSON status endpoint."""
try:
# Try different URL patterns
base_urls = [
self.stream_url, # Original URL
self.stream_url.replace('live.', 'listen.'), # Try listen. instead of live.
'/'.join(self.stream_url.split('/')[:-1]) if '/' in self.stream_url else self.stream_url # Try parent path
]
for base_url in base_urls:
try:
url = f"{base_url}/status-json.xsl"
self.logger.debug(f"Attempting to fetch metadata from: {url}")
async with aiohttp.ClientSession() as session:
url = f"{self.stream_url}/status-json.xsl"
async with session.get(url) as response:
if response.status == 200:
data = await response.text()
json_data = json.loads(data)
json_data = json.loads(data)
if 'icestats' in json_data:
sources = json_data['icestats'].get('source', [])
if isinstance(sources, list):
for src in sources:
if src['listenurl'].endswith(f'{self.stream_endpoint}'):
source = src
else:
source = sources
if not isinstance(sources, list):
sources = [sources]
title = source.get('title') or source.get('song') or source.get('current_song')
# Find our stream
for src in sources:
if src.get('listenurl', '').endswith(self.stream_endpoint):
title = src.get('title') or src.get('song') or src.get('current_song')
if title:
return title
except aiohttp.ClientError as e:
self.logger.debug(f"Failed to fetch from {url}: {e}")
continue
except json.JSONDecodeError as e:
self.logger.debug(f"Failed to parse JSON from {url}: {e}")
continue
self.logger.warning("Unable to fetch metadata from any URL")
return "Unable to fetch metadata"
except Exception as e:
self.logger.error(f"Error fetching JSON metadata: {e}")
self.logger.error(f"Error fetching JSON metadata: {e}", exc_info=True)
return "Error fetching metadata"
async def monitor_metadata(self):
await asyncio.sleep(5)
while True:
while self.should_monitor:
try:
cmd = [
'curl',
@ -270,24 +619,41 @@ class IcecastBot:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.current_process = process # Store the process reference
self.logger.info("Started stream monitoring")
last_data_received = time.time()
buffer = b""
last_json_check = time.time()
json_check_interval = 60 # Fallback interval if ICY updates fail
data_timeout = 180 # 3 minutes without data is considered a failure
empty_chunks_count = 0
max_empty_chunks = 5 # After 5 empty chunks in a row, reconnect
while True:
chunk = await process.stdout.read(8192)
if not chunk:
try:
chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
if chunk:
last_data_received = time.time()
buffer += chunk
empty_chunks_count = 0 # Reset counter on successful data
else:
empty_chunks_count += 1
self.logger.warning(f"Received empty chunk from stream ({empty_chunks_count}/{max_empty_chunks})")
if empty_chunks_count >= max_empty_chunks:
self.logger.error("Too many empty chunks in a row, restarting connection")
break
if time.time() - last_data_received > data_timeout:
self.logger.error(f"No data received for {data_timeout} seconds, restarting connection")
break
buffer += chunk
current_time = time.time()
# Periodic health check
if current_time - self.last_health_check >= self.health_check_interval:
self.logger.info("Monitor status: Active - processing stream data")
self.logger.info(
f"Monitor status: Active - Last data received {int(current_time - last_data_received)}s ago"
)
self.last_health_check = current_time
# Look for metadata marker but fetch from JSON
@ -310,6 +676,7 @@ class IcecastBot:
if current_time - last_json_check >= json_check_interval:
new_song = await self.fetch_json_metadata()
if "Unable to fetch metadata" in new_song:
if time.time() - last_data_received > data_timeout:
break
if new_song and new_song != self.current_song:
self.logger.info(f"Now Playing (fallback): {new_song}")
@ -319,8 +686,31 @@ class IcecastBot:
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
self.logger.warning("Timeout while reading stream data")
if time.time() - last_data_received > data_timeout:
self.logger.error("Stream read timeout exceeded limit, restarting connection")
break
continue
except Exception as e:
self.logger.error(f"Error in stream processing loop: {e}")
break
# Check if process is still running and terminate if needed
if process.returncode is None:
try:
process.terminate()
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
self.logger.warning("Process did not terminate gracefully, killing it")
process.kill()
await process.wait()
self.logger.warning("Stream monitor ended, restarting...")
except Exception as e:
self.logger.error(f"Error terminating process: {e}")
finally:
self.current_process = None
self.logger.warning("Stream monitor ended, restarting in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
@ -331,16 +721,32 @@ class IcecastBot:
"""Announce song if it doesn't match any ignore patterns."""
try:
if self.channel and self.should_announce_song(song):
# Use the stored channel object directly
if hasattr(self.channel, 'name') and self.channel.name.startswith('#'):
await self.channel.message(self.reply.format(song=song))
self.logger.debug(f"Successfully announced song: {song}")
else:
self.logger.error(f"Could not announce song - invalid channel object: {self.channel}")
except Exception as e:
self.logger.error(f"Error announcing song: {e}")
self.logger.error(f"Error announcing song: {e}", exc_info=True)
async def start(self):
await self.bot.run()
async def run_multiple_bots(config_paths: List[str]):
"""Run multiple bot instances concurrently."""
bots = [IcecastBot(config_path) for config_path in config_paths]
bots = []
for config_path in config_paths:
try:
bot = IcecastBot(config_path)
bots.append(bot)
except Exception as e:
logging.error(f"Failed to initialize bot with config {config_path}: {e}")
if not bots:
logging.error("No bots were successfully initialized")
return
await asyncio.gather(*(bot.start() for bot in bots))
if __name__ == "__main__":
@ -353,12 +759,29 @@ if __name__ == "__main__":
parser.add_argument('--irc-channel', type=str, help='IRC channel')
parser.add_argument('--stream-url', type=str, help='Icecast stream URL (base url; do not include /stream or .mp3)')
parser.add_argument('--stream-endpoint', type=str, help='Stream endpoint (e.g. /stream)')
parser.add_argument('--cmd-prefix', type=str, help='Command prefix character(s)')
args = parser.parse_args()
# Check for restart flag files at startup
try:
for flag_file in Path('.').glob('.restart_flag_*'):
try:
flag_file.unlink()
except Exception as e:
logging.error(f"Error removing restart flag file {flag_file}: {e}")
except Exception as e:
logging.error(f"Error handling restart flag files: {e}")
# Set up the event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run_bot():
try:
if args.configs:
# Multi-bot mode
asyncio.run(run_multiple_bots(args.configs))
await run_multiple_bots(args.configs)
else:
# Single-bot mode
bot = IcecastBot(args.config)
@ -376,5 +799,28 @@ if __name__ == "__main__":
bot.config['stream']['url'] = args.stream_url
if args.stream_endpoint:
bot.config['stream']['endpoint'] = args.stream_endpoint
if args.cmd_prefix:
if 'commands' not in bot.config:
bot.config['commands'] = {}
bot.config['commands']['prefix'] = args.cmd_prefix
asyncio.run(bot.start())
await bot.start()
except Exception as e:
logging.error(f"Unhandled exception: {e}")
sys.exit(1)
try:
# Run the bot
loop.run_until_complete(run_bot())
finally:
try:
# Cancel any remaining tasks
for task in asyncio.all_tasks(loop):
task.cancel()
# Give tasks a chance to cancel
loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True))
# Finally close the loop
loop.close()
except Exception as e:
logging.error(f"Error during cleanup: {e}")
sys.exit(1)