Merge pull request #336 from RockChinQ/cmds-permission-ctrl

[Refactor&Feat] 命令节点权限控制
This commit is contained in:
Rock Chin 2023-03-31 15:18:44 +08:00 committed by GitHub
commit 1acaf4e58b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1252 additions and 748 deletions

View File

@ -0,0 +1,48 @@
name: Update cmdpriv-template
on:
push:
paths:
- 'pkg/qqbot/cmds/**'
pull_request:
types: [closed]
paths:
- 'pkg/qqbot/cmds/**'
jobs:
update-cmdpriv-template:
if: github.event.pull_request.merged == true || github.event_name == 'push'
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.x
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run generate_cmdpriv_template.py
run: python3 generate_cmdpriv_template.py
- name: Check for changes in cmdpriv-template.json
id: check_changes
run: |
if git diff --name-only | grep -q "cmdpriv-template.json"; then
echo "::set-output name=changes_detected::true"
else
echo "::set-output name=changes_detected::false"
fi
- name: Commit changes to cmdpriv-template.json
if: steps.check_changes.outputs.changes_detected == 'true'
run: |
git config --global user.name "GitHub Actions Bot"
git config --global user.email "<github-actions@github.com>"
git add cmdpriv-template.json
git commit -m "Update cmdpriv-template.json"
git push

3
.gitignore vendored
View File

@ -16,4 +16,5 @@ scenario/
!scenario/default-template.json
override.json
cookies.json
res/announcement_saved
res/announcement_saved
cmdpriv.json

@ -1 +1 @@
Subproject commit d7ba01503c9c74254e987ffe53a0c8931f34037a
Subproject commit 1e3c599c0313acb6b1511087c0e579ee8b22dd2c

28
cmdpriv-template.json Normal file
View File

@ -0,0 +1,28 @@
{
"comment": "以下为命令权限请设置到cmdpriv.json中。关于此功能的说明请查看https://github.com/RockChinQ/QChatGPT/wiki/%E5%8A%9F%E8%83%BD%E4%BD%BF%E7%94%A8#%E5%91%BD%E4%BB%A4%E6%9D%83%E9%99%90%E6%8E%A7%E5%88%B6",
"draw": 1,
"plugin": 2,
"plugin.get": 2,
"plugin.update": 2,
"plugin.del": 2,
"plugin.off": 2,
"plugin.on": 2,
"default": 1,
"default.set": 2,
"del": 1,
"del.all": 1,
"delhst": 2,
"delhst.all": 2,
"last": 1,
"list": 1,
"next": 1,
"prompt": 1,
"resend": 1,
"reset": 1,
"cfg": 2,
"help": 1,
"reload": 2,
"update": 2,
"usage": 1,
"version": 1
}

View File

@ -0,0 +1,17 @@
import pkg.qqbot.cmds.mgr as cmdsmgr
import json
# 执行命令模块的注册
cmdsmgr.register_all()
# 生成限权文件模板
template: dict[str, int] = {
"comment": "以下为命令权限请设置到cmdpriv.json中。关于此功能的说明请查看https://github.com/RockChinQ/QChatGPT/wiki/%E5%8A%9F%E8%83%BD%E4%BD%BF%E7%94%A8#%E5%91%BD%E4%BB%A4%E6%9D%83%E9%99%90%E6%8E%A7%E5%88%B6",
}
for key in cmdsmgr.__command_list__:
template[key] = cmdsmgr.__command_list__[key]['privilege']
# 写入cmdpriv-template.json
with open('cmdpriv-template.json', 'w') as f:
f.write(json.dumps(template, indent=4, ensure_ascii=False))

18
main.py
View File

@ -7,6 +7,7 @@ import time
import logging
import sys
import traceback
sys.path.append(".")
@ -192,8 +193,15 @@ def start(first_time_init=False):
import pkg.openai.session
import pkg.qqbot.manager
import pkg.openai.dprompt
import pkg.qqbot.cmds.mgr
pkg.openai.dprompt.register_all()
try:
pkg.openai.dprompt.register_all()
pkg.qqbot.cmds.mgr.register_all()
pkg.qqbot.cmds.mgr.apply_privileges()
except Exception as e:
logging.error(e)
traceback.print_exc()
# 配置openai api_base
if "reverse_proxy" in config.openai_config and config.openai_config["reverse_proxy"] is not None:
@ -272,10 +280,6 @@ def start(first_time_init=False):
threading.Thread(
target=run_bot_wrapper
).start()
# 机器人暂时不能放在线程池中
# pkg.utils.context.get_thread_ctl().submit_sys_task(
# run_bot_wrapper
# )
finally:
# 判断若是Windows输出选择模式可能会暂停程序的警告
if os.name == 'nt':
@ -370,6 +374,10 @@ def check_file():
if not os.path.exists("scenario/default.json"):
shutil.copy("scenario/default-template.json", "scenario/default.json")
# 检查cmdpriv.json
if not os.path.exists("cmdpriv.json"):
shutil.copy("cmdpriv-template.json", "cmdpriv.json")
# 检查temp目录
if not os.path.exists("temp/"):
os.mkdir("temp/")

View File

@ -1,36 +0,0 @@
from pkg.qqbot.cmds.model import command
import logging
from mirai import Image
import config
import pkg.openai.session
@command(
"draw",
"使用DALL·E模型作画",
"!draw <图片提示语>",
[],
False
)
def cmd_draw(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""使用DALL·E模型作画"""
reply = []
if len(params) == 0:
reply = ["[bot]err:请输入图片描述文字"]
else:
session = pkg.openai.session.get_session(session_name)
res = session.draw_image(" ".join(params))
logging.debug("draw_image result:{}".format(res))
reply = [Image(url=res['data'][0]['url'])]
if not (hasattr(config, 'include_image_description')
and not config.include_image_description):
reply.append(" ".join(params))
return reply

View File

View File

@ -0,0 +1,35 @@
from ..mgr import AbstractCommandNode, Context
import logging
from mirai import Image
import config
@AbstractCommandNode.register(
parent=None,
name="draw",
description="使用DALL·E生成图片",
usage="!draw <图片提示语>",
aliases=[],
privilege=1
)
class DrawCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
reply = []
if len(ctx.params) == 0:
reply = ["[bot]err: 未提供图片描述文字"]
else:
session = pkg.openai.session.get_session(ctx.session_name)
res = session.draw_image(" ".join(ctx.params))
logging.debug("draw_image result:{}".format(res))
reply = [Image(url=res['data'][0]['url'])]
if not (hasattr(config, 'include_image_description')
and not config.include_image_description):
reply.append(" ".join(ctx.params))
return True, reply

326
pkg/qqbot/cmds/mgr.py Normal file
View File

@ -0,0 +1,326 @@
import importlib
import inspect
import logging
import copy
import pkgutil
import traceback
import types
import json
__command_list__ = {}
"""命令树
结构
{
'cmd1': {
'description': 'cmd1 description',
'usage': 'cmd1 usage',
'aliases': ['cmd1 alias1', 'cmd1 alias2'],
'privilege': 0,
'parent': None,
'cls': <class 'pkg.qqbot.cmds.cmd1.CommandCmd1'>,
'sub': [
'cmd1-1'
]
},
'cmd1.cmd1-1: {
'description': 'cmd1-1 description',
'usage': 'cmd1-1 usage',
'aliases': ['cmd1-1 alias1', 'cmd1-1 alias2'],
'privilege': 0,
'parent': 'cmd1',
'cls': <class 'pkg.qqbot.cmds.cmd1.CommandCmd1_1'>,
'sub': []
},
'cmd2': {
'description': 'cmd2 description',
'usage': 'cmd2 usage',
'aliases': ['cmd2 alias1', 'cmd2 alias2'],
'privilege': 0,
'parent': None,
'cls': <class 'pkg.qqbot.cmds.cmd2.CommandCmd2'>,
'sub': [
'cmd2-1'
]
},
'cmd2.cmd2-1': {
'description': 'cmd2-1 description',
'usage': 'cmd2-1 usage',
'aliases': ['cmd2-1 alias1', 'cmd2-1 alias2'],
'privilege': 0,
'parent': 'cmd2',
'cls': <class 'pkg.qqbot.cmds.cmd2.CommandCmd2_1'>,
'sub': [
'cmd2-1-1'
]
},
'cmd2.cmd2-1.cmd2-1-1': {
'description': 'cmd2-1-1 description',
'usage': 'cmd2-1-1 usage',
'aliases': ['cmd2-1-1 alias1', 'cmd2-1-1 alias2'],
'privilege': 0,
'parent': 'cmd2.cmd2-1',
'cls': <class 'pkg.qqbot.cmds.cmd2.CommandCmd2_1_1'>,
'sub': []
},
}
"""
__tree_index__: dict[str, list] = {}
"""命令树索引
结构
{
'pkg.qqbot.cmds.cmd1.CommandCmd1': 'cmd1', # 顶级指令
'pkg.qqbot.cmds.cmd1.CommandCmd1_1': 'cmd1.cmd1-1', # 类名: 节点路径
'pkg.qqbot.cmds.cmd2.CommandCmd2': 'cmd2',
'pkg.qqbot.cmds.cmd2.CommandCmd2_1': 'cmd2.cmd2-1',
'pkg.qqbot.cmds.cmd2.CommandCmd2_1_1': 'cmd2.cmd2-1.cmd2-1-1',
}
"""
class Context:
"""命令执行上下文"""
command: str
"""顶级指令文本"""
crt_command: str
"""当前子指令文本"""
params: list
"""完整参数列表"""
crt_params: list
"""当前子指令参数列表"""
session_name: str
"""会话名"""
text_message: str
"""指令完整文本"""
launcher_type: str
"""指令发起者类型"""
launcher_id: int
"""指令发起者ID"""
sender_id: int
"""指令发送者ID"""
is_admin: bool
"""[过时]指令发送者是否为管理员"""
privilege: int
"""指令发送者权限等级"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class AbstractCommandNode:
"""指令抽象类"""
parent: type
"""父指令类"""
name: str
"""指令名"""
description: str
"""指令描述"""
usage: str
"""指令用法"""
aliases: list[str]
"""指令别名"""
privilege: int
"""指令权限等级, 权限大于等于此值的用户才能执行指令"""
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
"""指令处理函数
:param ctx: 指令执行上下文
:return: (是否执行, 回复列表(若执行))
若未执行将自动以下一个参数查找并执行子指令
"""
raise NotImplementedError
@classmethod
def help(cls) -> str:
"""获取指令帮助信息"""
return '指令: {}\n描述: {}\n用法: \n{}\n别名: {}\n权限: {}'.format(
cls.name,
cls.description,
cls.usage,
', '.join(cls.aliases),
cls.privilege
)
@staticmethod
def register(
parent: type = None,
name: str = None,
description: str = None,
usage: str = None,
aliases: list[str] = None,
privilege: int = 0
):
"""注册指令
:param cls: 指令类
:param name: 指令名
:param parent: 父指令类
"""
global __command_list__, __tree_index__
def wrapper(cls):
cls.name = name
cls.parent = parent
cls.description = description
cls.usage = usage
cls.aliases = aliases
cls.privilege = privilege
logging.debug("cls: {}, name: {}, parent: {}".format(cls, name, parent))
if parent is None:
# 顶级指令注册
__command_list__[name] = {
'description': cls.description,
'usage': cls.usage,
'aliases': cls.aliases,
'privilege': cls.privilege,
'parent': None,
'cls': cls,
'sub': []
}
# 更新索引
__tree_index__[cls.__module__ + '.' + cls.__name__] = name
else:
# 获取父节点名称
path = __tree_index__[parent.__module__ + '.' + parent.__name__]
parent_node = __command_list__[path]
# 链接父子指令
__command_list__[path]['sub'].append(name)
# 注册子指令
__command_list__[path + '.' + name] = {
'description': cls.description,
'usage': cls.usage,
'aliases': cls.aliases,
'privilege': cls.privilege,
'parent': path,
'cls': cls,
'sub': []
}
# 更新索引
__tree_index__[cls.__module__ + '.' + cls.__name__] = path + '.' + name
return cls
return wrapper
class CommandPrivilegeError(Exception):
"""指令权限不足或不存在异常"""
pass
# 传入Context对象广搜命令树返回执行结果
# 若命令被处理返回reply列表
# 若命令未被处理,继续执行下一级指令
# 若命令不存在,报异常
def execute(context: Context) -> list:
"""执行指令
:param ctx: 指令执行上下文
:return: 回复列表
"""
global __command_list__
# 拷贝ctx
ctx: Context = copy.deepcopy(context)
# 从树取出顶级指令
node = __command_list__
path = ctx.command
while True:
try:
logging.debug('执行指令: {}'.format(path))
node = __command_list__[path]
# 检查权限
if ctx.privilege < node['privilege']:
raise CommandPrivilegeError('权限不足: {}'.format(path))
# 执行
execed, reply = node['cls'].process(ctx)
if execed:
return reply
else:
# 删除crt_params第一个参数
ctx.crt_command = ctx.crt_params.pop(0)
# 下一个path
path = path + '.' + ctx.crt_command
except KeyError:
traceback.print_exc()
raise CommandPrivilegeError('找不到指令: {}'.format(path))
def register_all():
"""启动时调用此函数注册所有指令
递归处理pkg.qqbot.cmds包下及其子包下所有模块的所有继承于AbstractCommand的类
"""
# 模块遍历其中的继承于AbstractCommand的类进行注册
# 包:递归处理包下的模块
# 排除__开头的属性
global __command_list__, __tree_index__
import pkg.qqbot.cmds
def walk(module, prefix, path_prefix):
# 排除不处于pkg.qqbot.cmds中的包
if not module.__name__.startswith('pkg.qqbot.cmds'):
return
logging.debug('walk: {}, path: {}'.format(module.__name__, module.__path__))
for item in pkgutil.iter_modules(module.__path__):
if item.name.startswith('__'):
continue
if item.ispkg:
walk(__import__(module.__name__ + '.' + item.name, fromlist=['']), prefix + item.name + '.', path_prefix + item.name + '/')
else:
m = __import__(module.__name__ + '.' + item.name, fromlist=[''])
# for name, cls in inspect.getmembers(m, inspect.isclass):
# # 检查是否为指令类
# if cls.__module__ == m.__name__ and issubclass(cls, AbstractCommandNode) and cls != AbstractCommandNode:
# cls.register(cls, cls.name, cls.parent)
walk(pkg.qqbot.cmds, '', '')
logging.debug(__command_list__)
def apply_privileges():
"""读取cmdpriv.json并应用指令权限"""
with open('cmdpriv.json', 'r') as f:
data = json.load(f)
for path, priv in data.items():
if path == 'comment':
continue
if __command_list__[path]['privilege'] != priv:
logging.debug('应用权限: {} -> {}(default: {})'.format(path, priv, __command_list__[path]['privilege']))
__command_list__[path]['privilege'] = priv

View File

@ -1,45 +0,0 @@
# 指令模型
import logging
commands = []
"""已注册的指令类
{
"name": "指令名",
"description": "指令描述",
"usage": "指令用法",
"aliases": ["别名1", "别名2"],
"admin_only": "是否仅管理员可用",
"func": "指令执行函数"
}
"""
def command(name: str, description: str, usage: str, aliases: list = None, admin_only: bool = False):
"""指令装饰器"""
def wrapper(fun):
commands.append({
"name": name,
"description": description,
"usage": usage,
"aliases": aliases,
"admin_only": admin_only,
"func": fun
})
return fun
return wrapper
def search(cmd: str) -> dict:
"""查找指令"""
for command in commands:
if (command["name"] == cmd) or (cmd in command["aliases"]):
return command
return None
import pkg.qqbot.cmds.func
import pkg.qqbot.cmds.system
import pkg.qqbot.cmds.session
import pkg.qqbot.cmds.plugin

View File

@ -1,129 +0,0 @@
from pkg.qqbot.cmds.model import command
import pkg.utils.context
import pkg.plugin.switch as plugin_switch
import os
import threading
import logging
def plugin_operation(cmd, params, is_admin):
reply = []
import pkg.plugin.host as plugin_host
import pkg.utils.updater as updater
plugin_list = plugin_host.__plugins__
if len(params) == 0:
reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__))
idx = 0
for key in plugin_host.iter_plugins_name():
plugin = plugin_list[key]
reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\
.format((idx+1), plugin['name'],
"[已禁用]" if not plugin['enabled'] else "",
plugin['description'],
plugin['version'], plugin['author'])
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1]))
if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT":
reply_str += "源码: "+remote_url+"\n"
idx += 1
reply = [reply_str]
elif params[0] == 'update':
# 更新所有插件
if is_admin:
def closure():
import pkg.utils.context
updated = []
for key in plugin_list:
plugin = plugin_list[key]
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1]))
if success:
updated.append(plugin['name'])
# 检查是否有requirements.txt
pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...")
for key in plugin_list:
plugin = plugin_list[key]
if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"):
logging.info("{}检测到requirements.txt安装依赖".format(plugin['name']))
import pkg.utils.pkgmgr
pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt")
import main
main.reset_logging()
pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated)))
threading.Thread(target=closure).start()
reply = ["[bot]正在更新所有插件,请勿重复发起..."]
else:
reply = ["[bot]err:权限不足"]
elif params[0] == 'del' or params[0] == 'delete':
if is_admin:
if len(params) < 2:
reply = ["[bot]err:未指定插件名"]
else:
plugin_name = params[1]
if plugin_name in plugin_list:
unin_path = plugin_host.uninstall_plugin(plugin_name)
reply = ["[bot]已删除插件: {} ({}), 请发送 !reload 重载插件".format(plugin_name, unin_path)]
else:
reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)]
else:
reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"]
elif params[0] == 'on' or params[0] == 'off' :
new_status = params[0] == 'on'
if is_admin:
if len(params) < 2:
reply = ["[bot]err:未指定插件名"]
else:
plugin_name = params[1]
if plugin_name in plugin_list:
plugin_list[plugin_name]['enabled'] = new_status
plugin_switch.dump_switch()
reply = ["[bot]已{}插件: {}".format("启用" if new_status else "禁用", plugin_name)]
else:
reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)]
else:
reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"]
elif params[0].startswith("http"):
if is_admin:
def closure():
try:
plugin_host.install_plugin(params[0])
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件")
except Exception as e:
logging.error("插件安装失败:{}".format(e))
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e))
threading.Thread(target=closure, args=()).start()
reply = ["[bot]正在安装插件..."]
else:
reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"]
else:
reply = ["[bot]err:未知参数: {}".format(params)]
return reply
@command(
"plugin",
"插件相关操作",
"!plugin\n!plugin <插件仓库地址>\!plugin update\n!plugin del <插件名>\n!plugin on <插件名>\n!plugin off <插件名>",
[],
False
)
def cmd_plugin(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""插件相关操作"""
reply = plugin_operation(cmd, params, is_admin)
return reply

View File

View File

@ -0,0 +1,195 @@
from ..mgr import AbstractCommandNode, Context
import os
import pkg.plugin.host as plugin_host
import pkg.utils.updater as updater
@AbstractCommandNode.register(
parent=None,
name="plugin",
description="插件管理",
usage="!plugin\n!plugin get <插件仓库地址>\!plugin update\n!plugin del <插件名>\n!plugin on <插件名>\n!plugin off <插件名>",
aliases=[],
privilege=2
)
class PluginCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
reply = []
plugin_list = plugin_host.__plugins__
if len(ctx.params) == 0:
# 列出所有插件
reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__))
idx = 0
for key in plugin_host.iter_plugins_name():
plugin = plugin_list[key]
reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\
.format((idx+1), plugin['name'],
"[已禁用]" if not plugin['enabled'] else "",
plugin['description'],
plugin['version'], plugin['author'])
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1]))
if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT":
reply_str += "源码: "+remote_url+"\n"
idx += 1
reply = [reply_str]
return True, reply
elif ctx.params[0].startswith("http"):
reply = ["[bot]err: 此命令已启用,请使用 !plugin get <插件仓库地址> 进行安装"]
return True, reply
else:
return False, []
@AbstractCommandNode.register(
parent=PluginCommand,
name="get",
description="安装插件",
usage="!plugin get <插件仓库地址>",
aliases=[],
privilege=2
)
class PluginGetCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import threading
import logging
import pkg.utils.context
if len(ctx.crt_params) == 0:
reply = ["[bot]err: 请提供插件仓库地址"]
return True, reply
reply = []
def closure():
try:
plugin_host.install_plugin(ctx.crt_params[0])
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件")
except Exception as e:
logging.error("插件安装失败:{}".format(e))
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e))
threading.Thread(target=closure, args=()).start()
reply = ["[bot]正在安装插件..."]
return True, reply
@AbstractCommandNode.register(
parent=PluginCommand,
name="update",
description="更新所有插件",
usage="!plugin update",
aliases=[],
privilege=2
)
class PluginUpdateCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import threading
import logging
plugin_list = plugin_host.__plugins__
reply = []
def closure():
import pkg.utils.context
updated = []
for key in plugin_list:
plugin = plugin_list[key]
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1]))
if success:
updated.append(plugin['name'])
# 检查是否有requirements.txt
pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...")
for key in plugin_list:
plugin = plugin_list[key]
if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"):
logging.info("{}检测到requirements.txt安装依赖".format(plugin['name']))
import pkg.utils.pkgmgr
pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt")
import main
main.reset_logging()
pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated)))
threading.Thread(target=closure).start()
reply = ["[bot]正在更新所有插件,请勿重复发起..."]
return True, reply
@AbstractCommandNode.register(
parent=PluginCommand,
name="del",
description="删除插件",
usage="!plugin del <插件名>",
aliases=[],
privilege=2
)
class PluginDelCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
plugin_list = plugin_host.__plugins__
reply = []
if len(ctx.crt_params) < 1:
reply = ["[bot]err: 未指定插件名"]
else:
plugin_name = ctx.crt_params[0]
if plugin_name in plugin_list:
unin_path = plugin_host.uninstall_plugin(plugin_name)
reply = ["[bot]已删除插件: {} ({}), 请发送 !reload 重载插件".format(plugin_name, unin_path)]
else:
reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)]
return True, reply
@AbstractCommandNode.register(
parent=PluginCommand,
name="on",
description="启用指定插件",
usage="!plugin on <插件名>",
aliases=[],
privilege=2
)
@AbstractCommandNode.register(
parent=PluginCommand,
name="off",
description="禁用指定插件",
usage="!plugin off <插件名>",
aliases=[],
privilege=2
)
class PluginOnOffCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.plugin.switch as plugin_switch
plugin_list = plugin_host.__plugins__
reply = []
print(ctx.params)
new_status = ctx.params[0] == 'on'
if len(ctx.crt_params) < 1:
reply = ["[bot]err: 未指定插件名"]
else:
plugin_name = ctx.crt_params[0]
if plugin_name in plugin_list:
plugin_list[plugin_name]['enabled'] = new_status
plugin_switch.dump_switch()
reply = ["[bot]已{}插件: {}".format("启用" if new_status else "禁用", plugin_name)]
else:
reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)]
return True, reply

View File

@ -1,298 +0,0 @@
# 会话管理相关指令
import datetime
import json
from pkg.qqbot.cmds.model import command
import pkg.openai.session
import pkg.utils.context
import config
@command(
"reset",
"重置当前会话",
"!reset\n!reset [使用情景预设名称]",
[],
False
)
def cmd_reset(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""重置会话"""
reply = []
if len(params) == 0:
pkg.openai.session.get_session(session_name).reset(explicit=True)
reply = ["[bot]会话已重置"]
else:
try:
import pkg.openai.dprompt as dprompt
pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0])
reply = ["[bot]会话已重置,使用场景预设:{}".format(dprompt.mode_inst().get_full_name(params[0]))]
except Exception as e:
reply = ["[bot]会话重置失败,错误信息:{}".format(e)]
return reply
@command(
"last",
"切换到前一次会话",
"!last",
[],
False
)
def cmd_last(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""切换到前一次会话"""
reply = []
result = pkg.openai.session.get_session(session_name).last_session()
if result is None:
reply = ["[bot]没有前一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)]
return reply
@command(
"next",
"切换到后一次会话",
"!next",
[],
False
)
def cmd_next(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: int, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""切换到后一次会话"""
reply = []
result = pkg.openai.session.get_session(session_name).next_session()
if result is None:
reply = ["[bot]没有后一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)]
return reply
@command(
"prompt",
"获取当前会话的前文",
"!prompt",
[],
False
)
def cmd_prompt(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""获取当前会话的前文"""
reply = []
msgs = ""
session: list = pkg.openai.session.get_session(session_name).prompt
for msg in session:
if len(params) != 0 and params[0] in ['-all', '-a']:
msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content'])
elif len(msg['content']) > 30:
msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30])
else:
msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content'])
reply = ["[bot]当前对话所有内容:\n{}".format(msgs)]
return reply
@command(
"list",
"列出当前会话的所有历史记录",
"!list\n!list [页数]",
[],
False
)
def cmd_list(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""列出当前会话的所有历史记录"""
reply = []
pkg.openai.session.get_session(session_name).persistence()
page = 0
if len(params) > 0:
try:
page = int(params[0])
except ValueError:
pass
results = pkg.openai.session.get_session(session_name).list_history(page=page)
if len(results) == 0:
reply = ["[bot]第{}页没有历史会话".format(page)]
else:
reply_str = "[bot]历史会话 第{}页:\n".format(page)
current = -1
for i in range(len(results)):
# 时间(使用create_timestamp转换) 序号 部分内容
datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp'])
msg = ""
try:
msg = json.loads(results[i]['prompt'])
except json.decoder.JSONDecodeError:
msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt'])
# 持久化
pkg.openai.session.get_session(session_name).persistence()
if len(msg) >= 2:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
msg[0]['content'])
else:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
"无内容")
if results[i]['create_timestamp'] == pkg.openai.session.get_session(
session_name).create_timestamp:
current = i + page * 10
reply_str += "\n以上信息倒序排列"
if current != -1:
reply_str += ",当前会话是 #{}\n".format(current)
else:
reply_str += ",当前处于全新会话或不在此页"
reply = [reply_str]
return reply
@command(
"resend",
"重新获取上一次问题的回复",
"!resend",
[],
False
)
def cmd_resend(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""重新获取上一次问题的回复"""
reply = []
session = pkg.openai.session.get_session(session_name)
to_send = session.undo()
mgr = pkg.utils.context.get_qqbot_manager()
reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config,
launcher_type, launcher_id, sender_id)
return reply
@command(
"del",
"删除当前会话的历史记录",
"!del <序号>\n!del all",
[],
False
)
def cmd_del(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""删除当前会话的历史记录"""
reply = []
if len(params) == 0:
reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"]
else:
if params[0] == 'all':
pkg.openai.session.get_session(session_name).delete_all_history()
reply = ["[bot]已删除所有历史会话"]
elif params[0].isdigit():
if pkg.openai.session.get_session(session_name).delete_history(int(params[0])):
reply = ["[bot]已删除历史会话 #{}".format(params[0])]
else:
reply = ["[bot]没有历史会话 #{}".format(params[0])]
else:
reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"]
return reply
@command(
"default",
"操作情景预设",
"!default\n!default [指定情景预设为默认]",
[],
False
)
def cmd_default(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""操作情景预设"""
reply = []
if len(params) == 0:
# 输出目前所有情景预设
import pkg.openai.dprompt as dprompt
reply_str = "[bot]当前所有情景预设({}模式):\n\n".format(config.preset_mode)
prompts = dprompt.mode_inst().list()
for key in prompts:
pro = prompts[key]
reply_str += "名称: {}".format(key)
for r in pro:
reply_str += "\n - [{}]: {}".format(r['role'], r['content'])
reply_str += "\n\n"
reply_str += "\n当前默认情景预设:{}\n".format(dprompt.mode_inst().get_using_name())
reply_str += "请使用 !default <情景预设名称> 来设置默认情景预设"
reply = [reply_str]
elif len(params) > 0 and is_admin:
# 设置默认情景
import pkg.openai.dprompt as dprompt
try:
full_name = dprompt.mode_inst().set_using_name(params[0])
reply = ["[bot]已设置默认情景预设为:{}".format(full_name)]
except Exception as e:
reply = ["[bot]err: {}".format(e)]
else:
reply = ["[bot]err: 仅管理员可设置默认情景预设"]
return reply
@command(
"delhst",
"删除指定会话的所有历史记录",
"!delhst <会话名称>\n!delhst all",
[],
True
)
def cmd_delhst(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""删除指定会话的所有历史记录"""
reply = []
if len(params) == 0:
reply = [
"[bot]err:请输入要删除的会话名: group_<群号> 或者 person_<QQ号>, 或使用 !delhst all 删除所有会话的历史记录"]
else:
if params[0] == "all":
pkg.utils.context.get_database_manager().delete_all_session_history()
reply = ["[bot]已删除所有会话的历史记录"]
else:
if pkg.utils.context.get_database_manager().delete_all_history(params[0]):
reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])]
else:
reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])]
return reply

View File

View File

@ -0,0 +1,73 @@
from ..mgr import AbstractCommandNode, Context
@AbstractCommandNode.register(
parent=None,
name="default",
description="操作情景预设",
usage="!default\n!default set [指定情景预设为默认]",
aliases=[],
privilege=1
)
class DefaultCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
session_name = ctx.session_name
params = ctx.params
reply = []
import config
if len(params) == 0:
# 输出目前所有情景预设
import pkg.openai.dprompt as dprompt
reply_str = "[bot]当前所有情景预设({}模式):\n\n".format(config.preset_mode)
prompts = dprompt.mode_inst().list()
for key in prompts:
pro = prompts[key]
reply_str += "名称: {}".format(key)
for r in pro:
reply_str += "\n - [{}]: {}".format(r['role'], r['content'])
reply_str += "\n\n"
reply_str += "\n当前默认情景预设:{}\n".format(dprompt.mode_inst().get_using_name())
reply_str += "请使用 !default set <情景预设名称> 来设置默认情景预设"
reply = [reply_str]
elif params[0] != "set":
reply = ["[bot]err: 已弃用,请使用!default set <情景预设名称> 来设置默认情景预设"]
else:
return False, []
return True, reply
@AbstractCommandNode.register(
parent=DefaultCommand,
name="set",
description="设置默认情景预设",
usage="!default set <情景预设名称>",
aliases=[],
privilege=2
)
class DefaultSetCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
reply = []
if len(ctx.crt_params) == 0:
reply = ["[bot]err: 请指定情景预设名称"]
elif len(ctx.crt_params) > 0:
import pkg.openai.dprompt as dprompt
try:
full_name = dprompt.mode_inst().set_using_name(ctx.crt_params[0])
reply = ["[bot]已设置默认情景预设为:{}".format(full_name)]
except Exception as e:
reply = ["[bot]err: {}".format(e)]
else:
reply = ["[bot]err: 仅管理员可设置默认情景预设"]
return True, reply

View File

@ -0,0 +1,52 @@
from ..mgr import AbstractCommandNode, Context
import datetime
@AbstractCommandNode.register(
parent=None,
name="del",
description="删除当前会话的历史记录",
usage="!del <序号>\n!del all",
aliases=[],
privilege=1
)
class DelCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
session_name = ctx.session_name
params = ctx.params
reply = []
if len(params) == 0:
reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"]
else:
if params[0] == 'all':
return False, []
elif params[0].isdigit():
if pkg.openai.session.get_session(session_name).delete_history(int(params[0])):
reply = ["[bot]已删除历史会话 #{}".format(params[0])]
else:
reply = ["[bot]没有历史会话 #{}".format(params[0])]
else:
reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"]
return True, reply
@AbstractCommandNode.register(
parent=DelCommand,
name="all",
description="删除当前会话的全部历史记录",
usage="!del all",
aliases=[],
privilege=1
)
class DelAllCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
session_name = ctx.session_name
reply = []
pkg.openai.session.get_session(session_name).delete_all_history()
reply = ["[bot]已删除所有历史会话"]
return True, reply

View File

@ -0,0 +1,50 @@
from ..mgr import AbstractCommandNode, Context
@AbstractCommandNode.register(
parent=None,
name="delhst",
description="删除指定会话的所有历史记录",
usage="!delhst <会话名称>\n!delhst all",
aliases=[],
privilege=2
)
class DelHistoryCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
import pkg.utils.context
params = ctx.params
reply = []
if len(params) == 0:
reply = [
"[bot]err:请输入要删除的会话名: group_<群号> 或者 person_<QQ号>, 或使用 !delhst all 删除所有会话的历史记录"]
else:
if params[0] == 'all':
return False, []
else:
if pkg.utils.context.get_database_manager().delete_all_history(params[0]):
reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])]
else:
reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])]
return True, reply
@AbstractCommandNode.register(
parent=DelHistoryCommand,
name="all",
description="删除所有会话的全部历史记录",
usage="!delhst all",
aliases=[],
privilege=2
)
class DelAllHistoryCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.utils.context
reply = []
pkg.utils.context.get_database_manager().delete_all_session_history()
reply = ["[bot]已删除所有会话的历史记录"]
return True, reply

View File

@ -0,0 +1,28 @@
from ..mgr import AbstractCommandNode, Context
import datetime
@AbstractCommandNode.register(
parent=None,
name="last",
description="切换前一次对话",
usage="!last",
aliases=[],
privilege=1
)
class LastCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
session_name = ctx.session_name
reply = []
result = pkg.openai.session.get_session(session_name).last_session()
if result is None:
reply = ["[bot]没有前一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)]
return True, reply

View File

@ -0,0 +1,67 @@
from ..mgr import AbstractCommandNode, Context
import datetime
import json
@AbstractCommandNode.register(
parent=None,
name='list',
description='列出当前会话的所有历史记录',
usage='!list\n!list [页数]',
aliases=[],
privilege=1
)
class ListCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
session_name = ctx.session_name
params = ctx.params
reply = []
pkg.openai.session.get_session(session_name).persistence()
page = 0
if len(params) > 0:
try:
page = int(params[0])
except ValueError:
pass
results = pkg.openai.session.get_session(session_name).list_history(page=page)
if len(results) == 0:
reply = ["[bot]第{}页没有历史会话".format(page)]
else:
reply_str = "[bot]历史会话 第{}页:\n".format(page)
current = -1
for i in range(len(results)):
# 时间(使用create_timestamp转换) 序号 部分内容
datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp'])
msg = ""
try:
msg = json.loads(results[i]['prompt'])
except json.decoder.JSONDecodeError:
msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt'])
# 持久化
pkg.openai.session.get_session(session_name).persistence()
if len(msg) >= 2:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
msg[0]['content'])
else:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
"无内容")
if results[i]['create_timestamp'] == pkg.openai.session.get_session(
session_name).create_timestamp:
current = i + page * 10
reply_str += "\n以上信息倒序排列"
if current != -1:
reply_str += ",当前会话是 #{}\n".format(current)
else:
reply_str += ",当前处于全新会话或不在此页"
reply = [reply_str]
return True, reply

View File

@ -0,0 +1,28 @@
from ..mgr import AbstractCommandNode, Context
import datetime
@AbstractCommandNode.register(
parent=None,
name="next",
description="切换后一次对话",
usage="!next",
aliases=[],
privilege=1
)
class NextCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
session_name = ctx.session_name
reply = []
result = pkg.openai.session.get_session(session_name).next_session()
if result is None:
reply = ["[bot]没有后一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)]
return True, reply

View File

@ -0,0 +1,32 @@
from ..mgr import AbstractCommandNode, Context
import datetime
@AbstractCommandNode.register(
parent=None,
name="prompt",
description="获取当前会话的前文",
usage="!prompt",
aliases=[],
privilege=1
)
class PromptCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
session_name = ctx.session_name
params = ctx.params
reply = []
msgs = ""
session: list = pkg.openai.session.get_session(session_name).prompt
for msg in session:
if len(params) != 0 and params[0] in ['-all', '-a']:
msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content'])
elif len(msg['content']) > 30:
msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30])
else:
msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content'])
reply = ["[bot]当前对话所有内容:\n{}".format(msgs)]
return True, reply

View File

@ -0,0 +1,30 @@
from ..mgr import AbstractCommandNode, Context
import datetime
@AbstractCommandNode.register(
parent=None,
name="resend",
description="重新获取上一次问题的回复",
usage="!resend",
aliases=[],
privilege=1
)
class ResendCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import pkg.openai.session
import config
session_name = ctx.session_name
reply = []
session = pkg.openai.session.get_session(session_name)
to_send = session.undo()
mgr = pkg.utils.context.get_qqbot_manager()
reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config,
ctx.launcher_type, ctx.launcher_id,
ctx.sender_id)
return True, reply

View File

@ -0,0 +1,34 @@
from ..mgr import AbstractCommandNode, Context
import pkg.openai.session
import pkg.utils.context
@AbstractCommandNode.register(
parent=None,
name='reset',
description='重置当前会话',
usage='!reset',
aliases=[],
privilege=1
)
class ResetCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
params = ctx.params
session_name = ctx.session_name
reply = ""
if len(params) == 0:
pkg.openai.session.get_session(session_name).reset(explicit=True)
reply = ["[bot]会话已重置"]
else:
try:
import pkg.openai.dprompt as dprompt
pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0])
reply = ["[bot]会话已重置,使用场景预设:{}".format(dprompt.mode_inst().get_full_name(params[0]))]
except Exception as e:
reply = ["[bot]会话重置失败:{}".format(e)]
return True, reply

View File

@ -1,216 +0,0 @@
from pkg.qqbot.cmds.model import command
import pkg.utils.context
import pkg.utils.updater
import pkg.utils.credit as credit
import config
import logging
import os
import threading
import traceback
import json
@command(
"help",
"获取帮助信息",
"!help",
[],
False
)
def cmd_help(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""获取帮助信息"""
return ["[bot]" + config.help_message]
@command(
"usage",
"获取使用情况",
"!usage",
[],
False
)
def cmd_usage(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""获取使用情况"""
reply = []
reply_str = "[bot]各api-key使用情况:\n\n"
api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key
for key_name in api_keys:
text_length = pkg.utils.context.get_openai_manager().audit_mgr \
.get_text_length_of_key(api_keys[key_name])
image_count = pkg.utils.context.get_openai_manager().audit_mgr \
.get_image_count_of_key(api_keys[key_name])
reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length),
int(image_count))
# 获取此key的额度
try:
http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None
credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy)
reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted'])
except Exception as e:
logging.warning("获取额度失败:{}".format(e))
reply = [reply_str]
return reply
@command(
"version",
"查看版本信息",
"!version",
[],
False
)
def cmd_version(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""查看版本信息"""
reply = []
reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info())
try:
if pkg.utils.updater.is_new_version_available():
reply_str += "\n有新版本可用,请使用命令 !update 进行更新"
except:
pass
reply = [reply_str]
return reply
@command(
"reload",
"执行热重载",
"!reload",
[],
True
)
def cmd_reload(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""执行热重载"""
import pkg.utils.reloader
def reload_task():
pkg.utils.reloader.reload_all()
threading.Thread(target=reload_task, daemon=True).start()
@command(
"update",
"更新程序",
"!update",
[],
True
)
def cmd_update(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""更新程序"""
reply = []
import pkg.utils.updater
import pkg.utils.reloader
import pkg.utils.context
def update_task():
try:
if pkg.utils.updater.update_all():
pkg.utils.reloader.reload_all(notify=False)
pkg.utils.context.get_qqbot_manager().notify_admin("更新完成")
else:
pkg.utils.context.get_qqbot_manager().notify_admin("无新版本")
except Exception as e0:
traceback.print_exc()
pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0))
return
threading.Thread(target=update_task, daemon=True).start()
reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."]
def config_operation(cmd, params):
reply = []
config = pkg.utils.context.get_config()
reply_str = ""
if len(params) == 0:
reply = ["[bot]err:请输入配置项"]
else:
cfg_name = params[0]
if cfg_name == 'all':
reply_str = "[bot]所有配置项:\n\n"
for cfg in dir(config):
if not cfg.startswith('__') and not cfg == 'logging':
# 根据配置项类型进行格式化如果是字典则转换为json并格式化
if isinstance(getattr(config, cfg), str):
reply_str += "{}: \"{}\"\n".format(cfg, getattr(config, cfg))
elif isinstance(getattr(config, cfg), dict):
# 不进行unicode转义并格式化
reply_str += "{}: {}\n".format(cfg,
json.dumps(getattr(config, cfg),
ensure_ascii=False, indent=4))
else:
reply_str += "{}: {}\n".format(cfg, getattr(config, cfg))
reply = [reply_str]
elif cfg_name in dir(config):
if len(params) == 1:
# 按照配置项类型进行格式化
if isinstance(getattr(config, cfg_name), str):
reply_str = "[bot]配置项{}: \"{}\"\n".format(cfg_name, getattr(config, cfg_name))
elif isinstance(getattr(config, cfg_name), dict):
reply_str = "[bot]配置项{}: {}\n".format(cfg_name,
json.dumps(getattr(config, cfg_name),
ensure_ascii=False, indent=4))
else:
reply_str = "[bot]配置项{}: {}\n".format(cfg_name, getattr(config, cfg_name))
reply = [reply_str]
else:
cfg_value = " ".join(params[1:])
# 类型转换如果是json则转换为字典
if cfg_value == 'true':
cfg_value = True
elif cfg_value == 'false':
cfg_value = False
elif cfg_value.isdigit():
cfg_value = int(cfg_value)
elif cfg_value.startswith('{') and cfg_value.endswith('}'):
cfg_value = json.loads(cfg_value)
else:
try:
cfg_value = float(cfg_value)
except ValueError:
pass
# 检查类型是否匹配
if isinstance(getattr(config, cfg_name), type(cfg_value)):
setattr(config, cfg_name, cfg_value)
pkg.utils.context.set_config(config)
reply = ["[bot]配置项{}修改成功".format(cfg_name)]
else:
reply = ["[bot]err:配置项{}类型不匹配".format(cfg_name)]
else:
reply = ["[bot]err:未找到配置项 {}".format(cfg_name)]
return reply
@command(
"cfg",
"配置文件相关操作",
"!cfg all\n!cfg <配置项名称>\n!cfg <配置项名称> <配置项新值>",
[],
True
)
def cmd_cfg(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""配置文件相关操作"""
reply = config_operation(cmd, params)
return reply

View File

View File

@ -0,0 +1,38 @@
from ..mgr import AbstractCommandNode, Context, __command_list__
@AbstractCommandNode.register(
parent=None,
name="help",
description="显示帮助信息",
usage="!help\n!help <指令名称>",
aliases=[],
privilege=1
)
class HelpCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
command_list = __command_list__
reply = []
if len(ctx.params) == 0:
reply_str = "[bot]当前所有指令:\n\n"
# 遍历顶级指令
for key in command_list:
command = command_list[key]
if command['parent'] is None:
reply_str += "!{} - {}\n".format(key, command['description'])
reply_str += "\n请使用 !help <指令名称> 来查看指令的详细信息"
reply = [reply_str]
else:
command_name = ctx.params[0]
if command_name in command_list:
reply = [command_list[command_name]['cls'].help()]
else:
reply = ["[bot]指令 {} 不存在".format(command_name)]
return True, reply

View File

@ -0,0 +1,23 @@
from ..mgr import AbstractCommandNode, Context
import threading
@AbstractCommandNode.register(
parent=None,
name="reload",
description="执行热重载",
usage="!reload",
aliases=[],
privilege=2
)
class ReloadCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
reply = []
import pkg.utils.reloader
def reload_task():
pkg.utils.reloader.reload_all()
threading.Thread(target=reload_task, daemon=True).start()
return True, reply

View File

@ -0,0 +1,38 @@
from ..mgr import AbstractCommandNode, Context
import threading
import traceback
@AbstractCommandNode.register(
parent=None,
name="update",
description="更新程序",
usage="!update",
aliases=[],
privilege=2
)
class UpdateCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
reply = []
import pkg.utils.updater
import pkg.utils.reloader
import pkg.utils.context
def update_task():
try:
if pkg.utils.updater.update_all():
pkg.utils.reloader.reload_all(notify=False)
pkg.utils.context.get_qqbot_manager().notify_admin("更新完成")
else:
pkg.utils.context.get_qqbot_manager().notify_admin("无新版本")
except Exception as e0:
traceback.print_exc()
pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0))
return
threading.Thread(target=update_task, daemon=True).start()
reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."]
return True, reply

View File

@ -0,0 +1,42 @@
from ..mgr import AbstractCommandNode, Context
import logging
@AbstractCommandNode.register(
parent=None,
name="usage",
description="获取使用情况",
usage="!usage",
aliases=[],
privilege=1
)
class UsageCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
import config
import pkg.utils.credit as credit
import pkg.utils.context
reply = []
reply_str = "[bot]各api-key使用情况:\n\n"
api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key
for key_name in api_keys:
text_length = pkg.utils.context.get_openai_manager().audit_mgr \
.get_text_length_of_key(api_keys[key_name])
image_count = pkg.utils.context.get_openai_manager().audit_mgr \
.get_image_count_of_key(api_keys[key_name])
reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length),
int(image_count))
# 获取此key的额度
try:
http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None
credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy)
reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted'])
except Exception as e:
logging.warning("获取额度失败:{}".format(e))
reply = [reply_str]
return True, reply

View File

@ -0,0 +1,27 @@
from ..mgr import AbstractCommandNode, Context
@AbstractCommandNode.register(
parent=None,
name="version",
description="查看版本信息",
usage="!version",
aliases=[],
privilege=1
)
class VersionCommand(AbstractCommandNode):
@classmethod
def process(cls, ctx: Context) -> tuple[bool, list]:
reply = []
import pkg.utils.updater
reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info())
try:
if pkg.utils.updater.is_new_version_available():
reply_str += "\n有新版本可用,请使用命令 !update 进行更新"
except:
pass
reply = [reply_str]
return True, reply

View File

@ -13,7 +13,8 @@ import pkg.utils.updater
import pkg.utils.context
import pkg.qqbot.message
import pkg.utils.credit as credit
import pkg.qqbot.cmds.model as cmdmodel
# import pkg.qqbot.cmds.model as cmdmodel
import pkg.qqbot.cmds.mgr as cmdmgr
from mirai import Image
@ -36,22 +37,24 @@ def process_command(session_name: str, text_message: str, mgr, config,
params = [cmd[1:]] + params
cmd = 'cfg'
# 选择指令处理函数
cmd_obj = cmdmodel.search(cmd)
if cmd_obj is not None and (cmd_obj['admin_only'] is False or is_admin):
cmd_func = cmd_obj['func']
reply = cmd_func(
cmd=cmd,
params=params,
session_name=session_name,
text_message=text_message,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
is_admin=is_admin,
)
else:
reply = ["[bot]err:未知的指令或权限不足: " + cmd]
# 包装参数
context = cmdmgr.Context(
command=cmd,
crt_command=cmd,
params=params,
crt_params=params[:],
session_name=session_name,
text_message=text_message,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
is_admin=is_admin,
privilege=2 if is_admin else 1, # 普通用户1管理员2
)
try:
reply = cmdmgr.execute(context)
except cmdmgr.CommandPrivilegeError as e:
reply = ["[bot]err:{}".format(e)]
return reply
except Exception as e:

View File

@ -28,6 +28,11 @@ def reload_all(notify=True):
import main
main.stop()
# 删除所有已注册的指令
import pkg.qqbot.cmds.mgr as cmdsmgr
cmdsmgr.__command_list__ = {}
cmdsmgr.__tree_index__ = {}
# 重载所有模块
context.context['exceeded_keys'] = context.get_openai_manager().key_mgr.exceeded
this_context = context.context