Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| df4888bc9a |
200
main.py
200
main.py
@ -431,14 +431,26 @@ class IcecastBot:
|
||||
# Check if recipient is a Channel object
|
||||
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
|
||||
|
||||
if not self.allow_private_commands and not is_channel:
|
||||
return
|
||||
# Always allow np command in private messages
|
||||
# Only check allow_private_commands for other commands
|
||||
|
||||
try:
|
||||
if is_channel:
|
||||
# For private messages, we need to ensure we have a valid recipient
|
||||
if not is_channel:
|
||||
# In private messages, send the response to the sender
|
||||
self.logger.debug(f"Sending now playing info to user {message.sender}")
|
||||
await message.sender.message(self.reply.format(song=self.current_song))
|
||||
else:
|
||||
# In channels, send to the channel
|
||||
self.logger.debug(f"Sending now playing info to channel {recipient}")
|
||||
await recipient.message(self.reply.format(song=self.current_song))
|
||||
except Exception as e:
|
||||
pass
|
||||
self.logger.error(f"Error sending now playing info: {str(e)}")
|
||||
try:
|
||||
# Try to send an error message back to the user
|
||||
await message.sender.message(f"Error sending now playing info: {str(e)}")
|
||||
except Exception:
|
||||
pass
|
||||
self.command_handlers['now_playing'] = now_playing
|
||||
|
||||
@self.bot.on_message(create_command_pattern('help'))
|
||||
@ -456,8 +468,8 @@ class IcecastBot:
|
||||
# Check if recipient is a Channel object
|
||||
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
|
||||
|
||||
if not self.allow_private_commands and not is_channel:
|
||||
return
|
||||
# Always allow help command in private messages
|
||||
# Only check allow_private_commands for non-help commands
|
||||
|
||||
try:
|
||||
# Parse message to check if a specific command was requested
|
||||
@ -476,19 +488,23 @@ class IcecastBot:
|
||||
if method_name:
|
||||
handler = self.command_handlers.get(method_name)
|
||||
if handler and handler.__doc__:
|
||||
# Get the first line of the docstring
|
||||
first_line = handler.__doc__.strip().split('\n')[0]
|
||||
# Format it using the template and add (admin only) if needed
|
||||
desc = first_line.split(':', 1)[1].strip()
|
||||
help_text = self.help_specific_format.format(
|
||||
prefix=self.cmd_prefix,
|
||||
cmd=pattern,
|
||||
desc=desc
|
||||
)
|
||||
|
||||
# Check if user has permission for this command
|
||||
if pattern in self.admin_commands and not self.is_admin(message.sender):
|
||||
help_text = "You don't have permission to use this command."
|
||||
# Check if command is hidden and we're in a channel
|
||||
if is_channel and hasattr(handler, 'is_hidden_command') and handler.is_hidden_command:
|
||||
help_text = f"Unknown command: {pattern}"
|
||||
else:
|
||||
# Get the first line of the docstring
|
||||
first_line = handler.__doc__.strip().split('\n')[0]
|
||||
# Format it using the template and add (admin only) if needed
|
||||
desc = first_line.split(':', 1)[1].strip()
|
||||
help_text = self.help_specific_format.format(
|
||||
prefix=self.cmd_prefix,
|
||||
cmd=pattern,
|
||||
desc=desc
|
||||
)
|
||||
|
||||
# Check if user has permission for this command
|
||||
if pattern in self.admin_commands and not self.is_admin(message.sender):
|
||||
help_text = "You don't have permission to use this command."
|
||||
else:
|
||||
help_text = f"No help available for command: {pattern}"
|
||||
else:
|
||||
@ -503,6 +519,10 @@ class IcecastBot:
|
||||
if pattern not in self.admin_commands: # If not an admin command
|
||||
handler = self.command_handlers.get(method_name)
|
||||
if handler and handler.__doc__:
|
||||
# Skip hidden commands in channel help
|
||||
if is_channel and hasattr(handler, 'is_hidden_command') and handler.is_hidden_command:
|
||||
continue
|
||||
|
||||
first_line = handler.__doc__.strip().split('\n')[0]
|
||||
desc = first_line.split(':', 1)[1].strip()
|
||||
general_commands.append(
|
||||
@ -522,6 +542,10 @@ class IcecastBot:
|
||||
if method_name:
|
||||
handler = self.command_handlers.get(method_name)
|
||||
if handler and handler.__doc__:
|
||||
# Skip hidden commands in channel help
|
||||
if is_channel and hasattr(handler, 'is_hidden_command') and handler.is_hidden_command:
|
||||
continue
|
||||
|
||||
first_line = handler.__doc__.strip().split('\n')[0]
|
||||
desc = first_line.split(':', 1)[1].strip()
|
||||
# Don't add (admin only) in the list view
|
||||
@ -536,10 +560,22 @@ class IcecastBot:
|
||||
|
||||
help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | " + " | ".join(formatted_groups)
|
||||
|
||||
if is_channel:
|
||||
# Send the help text to the appropriate recipient
|
||||
if not is_channel:
|
||||
# In private messages, send the response to the sender
|
||||
self.logger.debug(f"Sending help info to user {message.sender}")
|
||||
await message.sender.message(help_text)
|
||||
else:
|
||||
# In channels, send to the channel
|
||||
self.logger.debug(f"Sending help info to channel {recipient}")
|
||||
await recipient.message(help_text)
|
||||
except Exception as e:
|
||||
pass
|
||||
self.logger.error(f"Error sending help info: {str(e)}")
|
||||
try:
|
||||
# Try to send an error message back to the user
|
||||
await message.sender.message(f"Error sending help info: {str(e)}")
|
||||
except Exception:
|
||||
pass
|
||||
self.command_handlers['help_command'] = help_command
|
||||
|
||||
@self.bot.on_message(create_command_pattern('restart'))
|
||||
@ -651,20 +687,59 @@ class IcecastBot:
|
||||
@self.bot.on_message(create_command_pattern('unquiet'))
|
||||
@self.admin_required
|
||||
async def unquiet_bot(message):
|
||||
"""!unquiet: Enable song announcements
|
||||
"""!unquiet: Re-enable song announcements
|
||||
|
||||
Resumes announcing songs in the channel.
|
||||
The bot must already be monitoring the stream.
|
||||
The opposite of !quiet - allows the bot to resume announcing songs.
|
||||
|
||||
Args:
|
||||
message: IRC message object
|
||||
"""
|
||||
self.should_announce = True
|
||||
self.logger.info("Song announcements enabled by admin command")
|
||||
if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
|
||||
await message.recipient.message("Song announcements enabled. Bot will now announce songs.")
|
||||
try:
|
||||
self.should_announce = True
|
||||
await message.recipient.message("Song announcements re-enabled.")
|
||||
except Exception as e:
|
||||
pass
|
||||
self.command_handlers['unquiet_bot'] = unquiet_bot
|
||||
|
||||
@self.bot.on_message(create_command_pattern('announce'))
|
||||
@self.hidden_command
|
||||
@self.admin_required
|
||||
async def announce_message(message):
|
||||
"""!announce: Send a message from the bot to all channels
|
||||
|
||||
Allows admins to use the bot to send a message to all channels.
|
||||
Can be used in private message or in a channel.
|
||||
|
||||
Usage: !announce <message>
|
||||
|
||||
Args:
|
||||
message: IRC message object
|
||||
"""
|
||||
try:
|
||||
# Extract the message to announce (everything after the command)
|
||||
match = re.match(r"^.*?announce\s+(.*?)$", message.text)
|
||||
if match and match.group(1).strip():
|
||||
announcement = match.group(1).strip()
|
||||
|
||||
# Always send to the joined channel, regardless of where the command was received
|
||||
if self.channel and hasattr(self.channel, 'name') and self.channel.name.startswith('#'):
|
||||
await self.channel.message(announcement)
|
||||
else:
|
||||
# If we somehow don't have a valid channel, inform the admin
|
||||
self.logger.warning("No valid channel to announce to")
|
||||
# Only send this response if in a private message
|
||||
if hasattr(message.recipient, 'name') and not message.recipient.name.startswith('#'):
|
||||
await message.recipient.message("No valid channel to announce to.")
|
||||
else:
|
||||
await message.recipient.message("Usage: !announce <message>")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in announce_message: {str(e)}")
|
||||
try:
|
||||
await message.recipient.message(f"Error processing announcement: {str(e)}")
|
||||
except Exception:
|
||||
pass
|
||||
self.command_handlers['announce_message'] = announce_message
|
||||
|
||||
def _build_command_mappings(self):
|
||||
"""Build bidirectional mappings between command patterns and method names.
|
||||
|
||||
@ -1210,19 +1285,44 @@ class IcecastBot:
|
||||
base_cmds = f"\x02{prefix}np\x02 (current song) • \x02{prefix}help\x02 (this help)"
|
||||
help_text = f"\x02Icecast Bot v{self.VERSION}\x02 | Commands: {base_cmds}"
|
||||
|
||||
# Check if we're in a channel
|
||||
recipient = getattr(message, 'recipient', None)
|
||||
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
|
||||
|
||||
if self.is_admin(message.sender):
|
||||
admin_cmds = (
|
||||
admin_cmds = []
|
||||
|
||||
# Basic admin commands
|
||||
basic_admin = (
|
||||
f"\x02{prefix}start\x02/\x02stop\x02 (monitoring) • "
|
||||
f"\x02{prefix}reconnect\x02 (stream) • "
|
||||
f"\x02{prefix}restart\x02 (bot) • "
|
||||
f"\x02{prefix}quit\x02 (shutdown)"
|
||||
)
|
||||
help_text += f" | Admin: {admin_cmds}"
|
||||
admin_cmds.append(basic_admin)
|
||||
|
||||
# Add announce command only in private messages
|
||||
if not is_channel:
|
||||
admin_cmds.append(f"\x02{prefix}announce\x02 (send message)")
|
||||
|
||||
help_text += f" | Admin: {' • '.join(admin_cmds)}"
|
||||
|
||||
if hasattr(message.recipient, 'name') and message.recipient.name.startswith('#'):
|
||||
await message.recipient.message(help_text)
|
||||
# Send the help text to the appropriate recipient
|
||||
if not is_channel:
|
||||
# In private messages, send the response to the sender
|
||||
self.logger.debug(f"Sending fallback help info to user {message.sender}")
|
||||
await message.sender.message(help_text)
|
||||
else:
|
||||
# In channels, send to the channel
|
||||
self.logger.debug(f"Sending fallback help info to channel {recipient}")
|
||||
await recipient.message(help_text)
|
||||
except Exception as e:
|
||||
pass
|
||||
self.logger.error(f"Error sending fallback help info: {str(e)}")
|
||||
try:
|
||||
# Try to send an error message back to the user
|
||||
await message.sender.message(f"Error sending fallback help info: {str(e)}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def admin_required(self, f):
|
||||
"""Decorator to mark a command as requiring admin privileges.
|
||||
@ -1240,12 +1340,16 @@ class IcecastBot:
|
||||
recipient = getattr(message, 'recipient', None)
|
||||
is_channel = hasattr(recipient, 'name') and recipient.name.startswith('#')
|
||||
|
||||
if not self.allow_private_commands and not is_channel:
|
||||
return
|
||||
|
||||
# For admin commands, allow them in private messages regardless of allow_private_commands setting
|
||||
# Just check if the user is an admin
|
||||
if not is_channel: # If it's a private message
|
||||
if not self.is_admin(message.sender):
|
||||
return # Silently ignore if not an admin
|
||||
return await f(message, *args, **kwargs) # Allow command if admin
|
||||
|
||||
# Channel message handling remains the same
|
||||
if not self.is_admin(message.sender):
|
||||
if is_channel:
|
||||
await recipient.message("You don't have permission to use this command.")
|
||||
await recipient.message("You don't have permission to use this command.")
|
||||
return
|
||||
|
||||
return await f(message, *args, **kwargs)
|
||||
@ -1254,12 +1358,32 @@ class IcecastBot:
|
||||
wrapped.__doc__ = f.__doc__
|
||||
wrapped.__name__ = f.__name__
|
||||
|
||||
# Preserve the is_hidden_command attribute if it exists
|
||||
if hasattr(f, 'is_hidden_command'):
|
||||
wrapped.is_hidden_command = f.is_hidden_command
|
||||
|
||||
# Add the command pattern to admin_commands set
|
||||
if f.__doc__ and f.__doc__.strip().startswith('!'):
|
||||
pattern = f.__doc__.strip().split(':', 1)[0].strip('!')
|
||||
self.admin_commands.add(pattern)
|
||||
|
||||
return wrapped
|
||||
|
||||
def hidden_command(self, f):
|
||||
"""Decorator to mark a command as hidden from help output in channels.
|
||||
|
||||
Hidden commands will only appear in help output when the help command
|
||||
is used in a private message.
|
||||
|
||||
Args:
|
||||
f: The command handler function to wrap.
|
||||
|
||||
Returns:
|
||||
The wrapped function with a hidden flag.
|
||||
"""
|
||||
# Set a flag on the function to mark it as hidden
|
||||
f.is_hidden_command = True
|
||||
return f
|
||||
|
||||
async def run_multiple_bots(config_paths: List[str]):
|
||||
"""Run multiple bot instances concurrently.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user