implemented

This commit is contained in:
cottongin 2025-03-08 17:30:19 -08:00
parent 0dd77a73df
commit df4888bc9a
Signed by: cottongin
GPG Key ID: A0BD18428A296890
2 changed files with 163 additions and 39 deletions

View File

@ -1 +1 @@
1.2.2
-testing

200
main.py
View File

@ -431,14 +431,26 @@ class IcecastBot:
# Check if recipient is a Channel object
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
# Always allow np command in private messages
# Only check allow_private_commands for other commands
try:
if is_channel:
# For private messages, we need to ensure we have a valid recipient
if not is_channel:
# In private messages, send the response to the sender
self.logger.debug(f"Sending now playing info to user {message.sender}")
await message.sender.message(self.reply.format(song=self.current_song))
else:
# In channels, send to the channel
self.logger.debug(f"Sending now playing info to channel {recipient}")
await recipient.message(self.reply.format(song=self.current_song))
except Exception as e:
pass
self.logger.error(f"Error sending now playing info: {str(e)}")
try:
# Try to send an error message back to the user
await message.sender.message(f"Error sending now playing info: {str(e)}")
except Exception:
pass
self.command_handlers['now_playing'] = now_playing
@self.bot.on_message(create_command_pattern('help'))
@ -456,8 +468,8 @@ class IcecastBot:
# Check if recipient is a Channel object
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
# Always allow help command in private messages
# Only check allow_private_commands for non-help commands
try:
# Parse message to check if a specific command was requested
@ -476,19 +488,23 @@ class IcecastBot:
if method_name:
handler = self.command_handlers.get(method_name)
if handler and handler.__doc__:
# Get the first line of the docstring
first_line = handler.__doc__.strip().split('\n')[0]
# Format it using the template and add (admin only) if needed
desc = first_line.split(':', 1)[1].strip()
help_text = self.help_specific_format.format(
prefix=self.cmd_prefix,
cmd=pattern,
desc=desc
)
# Check if user has permission for this command
if pattern in self.admin_commands and not self.is_admin(message.sender):
help_text = "You don't have permission to use this command."
# Check if command is hidden and we're in a channel
if is_channel and hasattr(handler, 'is_hidden_command') and handler.is_hidden_command:
help_text = f"Unknown command: {pattern}"
else:
# Get the first line of the docstring
first_line = handler.__doc__.strip().split('\n')[0]
# Format it using the template and add (admin only) if needed
desc = first_line.split(':', 1)[1].strip()
help_text = self.help_specific_format.format(
prefix=self.cmd_prefix,
cmd=pattern,
desc=desc
)
# Check if user has permission for this command
if pattern in self.admin_commands and not self.is_admin(message.sender):
help_text = "You don't have permission to use this command."
else:
help_text = f"No help available for command: {pattern}"
else:
@ -503,6 +519,10 @@ class IcecastBot:
if pattern not in self.admin_commands: # If not an admin command
handler = self.command_handlers.get(method_name)
if handler and handler.__doc__:
# Skip hidden commands in channel help
if is_channel and hasattr(handler, 'is_hidden_command') and handler.is_hidden_command:
continue
first_line = handler.__doc__.strip().split('\n')[0]
desc = first_line.split(':', 1)[1].strip()
general_commands.append(
@ -522,6 +542,10 @@ class IcecastBot:
if method_name:
handler = self.command_handlers.get(method_name)
if handler and handler.__doc__:
# Skip hidden commands in channel help
if is_channel and hasattr(handler, 'is_hidden_command') and handler.is_hidden_command:
continue
first_line = handler.__doc__.strip().split('\n')[0]
desc = first_line.split(':', 1)[1].strip()
# Don't add (admin only) in the list view
@ -536,10 +560,22 @@ class IcecastBot:
help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | " + " | ".join(formatted_groups)
if is_channel:
# Send the help text to the appropriate recipient
if not is_channel:
# In private messages, send the response to the sender
self.logger.debug(f"Sending help info to user {message.sender}")
await message.sender.message(help_text)
else:
# In channels, send to the channel
self.logger.debug(f"Sending help info to channel {recipient}")
await recipient.message(help_text)
except Exception as e:
pass
self.logger.error(f"Error sending help info: {str(e)}")
try:
# Try to send an error message back to the user
await message.sender.message(f"Error sending help info: {str(e)}")
except Exception:
pass
self.command_handlers['help_command'] = help_command
@self.bot.on_message(create_command_pattern('restart'))
@ -651,20 +687,59 @@ class IcecastBot:
@self.bot.on_message(create_command_pattern('unquiet'))
@self.admin_required
async def unquiet_bot(message):
"""!unquiet: Enable song announcements
"""!unquiet: Re-enable song announcements
Resumes announcing songs in the channel.
The bot must already be monitoring the stream.
The opposite of !quiet - allows the bot to resume announcing songs.
Args:
message: IRC message object
"""
self.should_announce = True
self.logger.info("Song announcements enabled by admin command")
if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
await message.recipient.message("Song announcements enabled. Bot will now announce songs.")
try:
self.should_announce = True
await message.recipient.message("Song announcements re-enabled.")
except Exception as e:
pass
self.command_handlers['unquiet_bot'] = unquiet_bot
@self.bot.on_message(create_command_pattern('announce'))
@self.hidden_command
@self.admin_required
async def announce_message(message):
"""!announce: Send a message from the bot to all channels
Allows admins to use the bot to send a message to all channels.
Can be used in private message or in a channel.
Usage: !announce <message>
Args:
message: IRC message object
"""
try:
# Extract the message to announce (everything after the command)
match = re.match(r"^.*?announce\s+(.*?)$", message.text)
if match and match.group(1).strip():
announcement = match.group(1).strip()
# Always send to the joined channel, regardless of where the command was received
if self.channel and hasattr(self.channel, 'name') and self.channel.name.startswith('#'):
await self.channel.message(announcement)
else:
# If we somehow don't have a valid channel, inform the admin
self.logger.warning("No valid channel to announce to")
# Only send this response if in a private message
if hasattr(message.recipient, 'name') and not message.recipient.name.startswith('#'):
await message.recipient.message("No valid channel to announce to.")
else:
await message.recipient.message("Usage: !announce <message>")
except Exception as e:
self.logger.error(f"Error in announce_message: {str(e)}")
try:
await message.recipient.message(f"Error processing announcement: {str(e)}")
except Exception:
pass
self.command_handlers['announce_message'] = announce_message
def _build_command_mappings(self):
"""Build bidirectional mappings between command patterns and method names.
@ -1210,19 +1285,44 @@ class IcecastBot:
base_cmds = f"\x02{prefix}np\x02 (current song) • \x02{prefix}help\x02 (this help)"
help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | Commands: {base_cmds}"
# Check if we're in a channel
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if self.is_admin(message.sender):
admin_cmds = (
admin_cmds = []
# Basic admin commands
basic_admin = (
f"\x02{prefix}start\x02/\x02stop\x02 (monitoring) • "
f"\x02{prefix}reconnect\x02 (stream) • "
f"\x02{prefix}restart\x02 (bot) • "
f"\x02{prefix}quit\x02 (shutdown)"
)
help_text += f" | Admin: {admin_cmds}"
admin_cmds.append(basic_admin)
# Add announce command only in private messages
if not is_channel:
admin_cmds.append(f"\x02{prefix}announce\x02 (send message)")
help_text += f" | Admin: {''.join(admin_cmds)}"
if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
await message.recipient.message(help_text)
# Send the help text to the appropriate recipient
if not is_channel:
# In private messages, send the response to the sender
self.logger.debug(f"Sending fallback help info to user {message.sender}")
await message.sender.message(help_text)
else:
# In channels, send to the channel
self.logger.debug(f"Sending fallback help info to channel {recipient}")
await recipient.message(help_text)
except Exception as e:
pass
self.logger.error(f"Error sending fallback help info: {str(e)}")
try:
# Try to send an error message back to the user
await message.sender.message(f"Error sending fallback help info: {str(e)}")
except Exception:
pass
def admin_required(self, f):
"""Decorator to mark a command as requiring admin privileges.
@ -1240,12 +1340,16 @@ class IcecastBot:
recipient = getattr(message, 'recipient', None)
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
if not self.allow_private_commands and not is_channel:
return
# For admin commands, allow them in private messages regardless of allow_private_commands setting
# Just check if the user is an admin
if not is_channel: # If it's a private message
if not self.is_admin(message.sender):
return # Silently ignore if not an admin
return await f(message, *args, **kwargs) # Allow command if admin
# Channel message handling remains the same
if not self.is_admin(message.sender):
if is_channel:
await recipient.message("You don't have permission to use this command.")
await recipient.message("You don't have permission to use this command.")
return
return await f(message, *args, **kwargs)
@ -1254,12 +1358,32 @@ class IcecastBot:
wrapped.__doc__ = f.__doc__
wrapped.__name__ = f.__name__
# Preserve the is_hidden_command attribute if it exists
if hasattr(f, 'is_hidden_command'):
wrapped.is_hidden_command = f.is_hidden_command
# Add the command pattern to admin_commands set
if f.__doc__ and f.__doc__.strip().startswith('!'):
pattern = f.__doc__.strip().split(':', 1)[0].strip('!')
self.admin_commands.add(pattern)
return wrapped
def hidden_command(self, f):
"""Decorator to mark a command as hidden from help output in channels.
Hidden commands will only appear in help output when the help command
is used in a private message.
Args:
f: The command handler function to wrap.
Returns:
The wrapped function with a hidden flag.
"""
# Set a flag on the function to mark it as hidden
f.is_hidden_command = True
return f
async def run_multiple_bots(config_paths: List[str]):
"""Run multiple bot instances concurrently.