feat(logging): Enhance logging system with colored identifiers and error handling

- Add colored and right-aligned bot identifiers in logs with consistent column width
- Implement unique color assignment for each bot instance (falls back to hash-based when colors exhausted)
- Add padding system to maintain uniform column alignment across all bot instances
- Redirect asif library logs to ERROR.log file
- Add ERROR.log to .gitignore
- Improve log formatting with timestamps and bot identifiers
- Implement dynamic column width adjustment when new bots join
This commit is contained in:
cottongin 2025-02-23 09:02:46 -08:00
parent 8651cc77ad
commit 2f8cca0d5e
Signed by: cottongin
GPG Key ID: A0BD18428A296890
2 changed files with 155 additions and 23 deletions

1
.gitignore vendored
View File

@ -34,6 +34,7 @@ config*.yaml
# Logs # Logs
*.log *.log
logs/ logs/
ERROR.log
# Unit test / coverage reports # Unit test / coverage reports
htmlcov/ htmlcov/

177
main.py
View File

@ -1,30 +1,158 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from asif import Client # Set up logging configuration before imports
import asyncio
import re
import aiohttp
import json
import time
import logging import logging
import argparse
import yaml
from pathlib import Path
from typing import List, Optional
# Set up logging - only show important info class AsifFilter(logging.Filter):
def filter(self, record):
# Block messages from asif module or if they start with "Joined channel"
return not (record.module == 'asif' or
record.name.startswith('asif') or
(isinstance(record.msg, str) and record.msg.startswith('Joined channel')))
# Set up base logging configuration
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(message)s', format='%(asctime)s - %(message)s',
datefmt='%H:%M:%S' datefmt='%H:%M:%S'
) )
logger = logging.getLogger(__name__)
# Apply filter to root logger
logging.getLogger().addFilter(AsifFilter())
# Also apply to asif's logger specifically
asif_logger = logging.getLogger('asif')
asif_logger.addFilter(AsifFilter())
asif_logger.setLevel(logging.CRITICAL)
asif_logger.propagate = False
from asif import Client
# Set up error logging for asif
error_handler = logging.FileHandler('ERROR.log')
error_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# Patch asif's Client class to use error logger
def silent_client_init(self, *args, **kwargs):
# Create a logger that writes to ERROR.log
error_logger = logging.getLogger('asif.client')
error_logger.addHandler(error_handler)
error_logger.propagate = False # Don't send to console
error_logger.setLevel(logging.INFO) # Capture all messages
# Store the logger
self._log = error_logger
# Call the original __init__
original_init(self, *args, **kwargs)
# Save original __init__ and replace it
original_init = Client.__init__
Client.__init__ = silent_client_init
import asyncio
import re
import aiohttp
import json
import time
import argparse
import yaml
from pathlib import Path
from typing import List, Optional
# ANSI color codes
class Colors:
COLORS = [
'\033[94m', # BLUE
'\033[96m', # CYAN
'\033[92m', # GREEN
'\033[93m', # YELLOW
'\033[95m', # MAGENTA
'\033[91m', # RED
]
ENDC = '\033[0m'
BOLD = '\033[1m'
# Track used colors
used_colors = {} # botname -> color mapping
@classmethod
def get_color_for_bot(cls, botname: str) -> str:
"""Get a consistent color for a bot based on its name."""
# If this bot already has a color, return it
if botname in cls.used_colors:
return cls.used_colors[botname]
# If we still have unused colors, use the next available one
unused_colors = [c for c in cls.COLORS if c not in cls.used_colors.values()]
if unused_colors:
color = unused_colors[0]
else:
# If we're out of unique colors, fall back to hash-based selection
color = cls.COLORS[hash(botname) % len(cls.COLORS)]
cls.used_colors[botname] = color
return color
class BotLoggerAdapter(logging.LoggerAdapter):
# Class variables to track maximum lengths
max_nick_length = 0
max_endpoint_length = 0
instances = [] # Keep track of all instances to update padding
def __init__(self, logger, extra):
super().__init__(logger, extra)
botname = extra['botname']
nick, endpoint = botname.split('@')
self.nick = nick
self.endpoint = endpoint
# Update max lengths (without ANSI codes)
old_max_nick = BotLoggerAdapter.max_nick_length
old_max_endpoint = BotLoggerAdapter.max_endpoint_length
BotLoggerAdapter.max_nick_length = max(
BotLoggerAdapter.max_nick_length,
len(nick)
)
BotLoggerAdapter.max_endpoint_length = max(
BotLoggerAdapter.max_endpoint_length,
len(endpoint)
)
# If max lengths changed, update all existing instances
if (old_max_nick != BotLoggerAdapter.max_nick_length or
old_max_endpoint != BotLoggerAdapter.max_endpoint_length):
for instance in BotLoggerAdapter.instances:
instance.update_padding()
# Add self to instances list
BotLoggerAdapter.instances.append(self)
# Initial padding calculation
self.update_padding()
def update_padding(self):
"""Update the colored botname with current padding requirements."""
# Right-align nick, then @ symbol, then colored endpoint
nick_padding = " " * (BotLoggerAdapter.max_nick_length - len(self.nick))
endpoint_padding = " " * (BotLoggerAdapter.max_endpoint_length - len(self.endpoint))
self.colored_botname = f"{nick_padding}{self.nick}@{Colors.BOLD}{Colors.get_color_for_bot(self.nick+'@'+self.endpoint)}{self.endpoint}{Colors.ENDC}{endpoint_padding}"
def process(self, msg, kwargs):
return f'[{self.colored_botname}] {msg}', kwargs
class IcecastBot: class IcecastBot:
def __init__(self, config_path: Optional[str] = None): def __init__(self, config_path: Optional[str] = None):
# Load config # Load config
self.config = self.load_config(config_path) self.config = self.load_config(config_path)
# Set up bot-specific logger
self.logger = BotLoggerAdapter(
logging.getLogger(__name__),
{'botname': f'{self.config["irc"]["nick"]}@{self.config["stream"]["endpoint"]}'}
)
# Initialize IRC bot with config # Initialize IRC bot with config
self.bot = Client( self.bot = Client(
host=self.config['irc']['host'], host=self.config['irc']['host'],
@ -58,7 +186,9 @@ class IcecastBot:
with open(config_path) as f: with open(config_path) as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
except FileNotFoundError: except FileNotFoundError:
logger.warning(f"Config file not found at {config_path}, using defaults") # Create a temporary logger for config loading
temp_logger = logging.getLogger(__name__)
temp_logger.warning(f"Config file not found at {config_path}, using defaults")
config = { config = {
'irc': {}, 'irc': {},
'stream': {}, 'stream': {},
@ -79,14 +209,15 @@ class IcecastBot:
async def connected(): async def connected():
try: try:
self.channel = await self.bot.join(self.channel_name) self.channel = await self.bot.join(self.channel_name)
logger.info(f"Connected to IRC and joined {self.channel_name}") self.logger.info(f"Connected to IRC and joined {self.channel_name}")
except Exception as e: except Exception as e:
logger.error(f"Error joining channel: {e}") self.logger.error(f"Error joining channel: {e}")
asyncio.create_task(self.monitor_metadata()) asyncio.create_task(self.monitor_metadata())
@self.bot.on_join() @self.bot.on_join()
async def on_join(channel): async def on_join(channel):
# Silently store the channel without logging
if not self.channel: if not self.channel:
self.channel = channel self.channel = channel
@ -118,7 +249,7 @@ class IcecastBot:
return "Unable to fetch metadata" return "Unable to fetch metadata"
except Exception as e: except Exception as e:
logger.error(f"Error fetching JSON metadata: {e}") self.logger.error(f"Error fetching JSON metadata: {e}")
return "Error fetching metadata" return "Error fetching metadata"
async def monitor_metadata(self): async def monitor_metadata(self):
@ -140,7 +271,7 @@ class IcecastBot:
stderr=asyncio.subprocess.PIPE stderr=asyncio.subprocess.PIPE
) )
logger.info("Started stream monitoring") self.logger.info("Started stream monitoring")
buffer = b"" buffer = b""
last_json_check = time.time() last_json_check = time.time()
@ -156,14 +287,14 @@ class IcecastBot:
# Periodic health check # Periodic health check
if current_time - self.last_health_check >= self.health_check_interval: if current_time - self.last_health_check >= self.health_check_interval:
logger.info("Monitor status: Active - processing stream data") self.logger.info("Monitor status: Active - processing stream data")
self.last_health_check = current_time self.last_health_check = current_time
# Look for metadata marker but fetch from JSON # Look for metadata marker but fetch from JSON
if b"StreamTitle='" in buffer: if b"StreamTitle='" in buffer:
new_song = await self.fetch_json_metadata() new_song = await self.fetch_json_metadata()
if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song: if new_song and new_song != self.current_song and "Unable to fetch metadata" not in new_song:
logger.info(f"Now Playing: {new_song}") self.logger.info(f"Now Playing: {new_song}")
self.current_song = new_song self.current_song = new_song
await self.announce_song(new_song) await self.announce_song(new_song)
@ -181,7 +312,7 @@ class IcecastBot:
if "Unable to fetch metadata" in new_song: if "Unable to fetch metadata" in new_song:
break break
if new_song and new_song != self.current_song: if new_song and new_song != self.current_song:
logger.info(f"Now Playing (fallback): {new_song}") self.logger.info(f"Now Playing (fallback): {new_song}")
self.current_song = new_song self.current_song = new_song
await self.announce_song(new_song) await self.announce_song(new_song)
last_json_check = current_time last_json_check = current_time
@ -189,11 +320,11 @@ class IcecastBot:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
await process.wait() await process.wait()
logger.warning("Stream monitor ended, restarting...") self.logger.warning("Stream monitor ended, restarting...")
await asyncio.sleep(5) await asyncio.sleep(5)
except Exception as e: except Exception as e:
logger.error(f"Stream monitor error: {e}") self.logger.error(f"Stream monitor error: {e}")
await asyncio.sleep(5) await asyncio.sleep(5)
async def announce_song(self, song: str): async def announce_song(self, song: str):
@ -202,7 +333,7 @@ class IcecastBot:
if self.channel and self.should_announce_song(song): if self.channel and self.should_announce_song(song):
await self.channel.message(self.reply.format(song=song)) await self.channel.message(self.reply.format(song=song))
except Exception as e: except Exception as e:
logger.error(f"Error announcing song: {e}") self.logger.error(f"Error announcing song: {e}")
async def start(self): async def start(self):
await self.bot.run() await self.bot.run()