QChatGPT/pkg/qqbot/sources/nakuru.py

320 lines
12 KiB
Python
Raw Normal View History

2023-04-23 15:58:37 +08:00
import mirai
from ..adapter import MessageSourceAdapter, MessageConverter, EventConverter
import nakuru
2023-04-23 23:40:08 +08:00
import nakuru.entities.components as nkc
2023-04-23 15:58:37 +08:00
import asyncio
import typing
2023-04-23 23:40:08 +08:00
import traceback
import logging
import json
2023-04-23 15:58:37 +08:00
2023-04-24 12:46:33 +08:00
from pkg.qqbot.blob import Forward, ForwardMessageNode, ForwardMessageDiaplay
2023-04-23 15:58:37 +08:00
class NakuruProjectMessageConverter(MessageConverter):
2023-04-24 10:34:51 +08:00
"""消息转换器"""
2023-04-23 15:58:37 +08:00
@staticmethod
def yiri2target(message_chain: mirai.MessageChain) -> list:
2023-04-23 23:40:08 +08:00
msg_list = []
if type(message_chain) is mirai.MessageChain:
msg_list = message_chain.__root__
elif type(message_chain) is list:
msg_list = message_chain
else:
raise Exception("Unknown message type: " + str(message_chain) + type(message_chain))
nakuru_msg_list = []
# 遍历并转换
for component in msg_list:
if type(component) is mirai.Plain:
2023-04-24 12:46:33 +08:00
nakuru_msg_list.append(nkc.Plain(component.text, False))
2023-04-23 23:40:08 +08:00
elif type(component) is mirai.Image:
if component.url is not None:
nakuru_msg_list.append(nkc.Image.fromURL(component.url))
elif component.base64 is not None:
nakuru_msg_list.append(nkc.Image.fromBase64(component.base64))
elif component.path is not None:
nakuru_msg_list.append(nkc.Image.fromFileSystem(component.path))
elif type(component) is mirai.Face:
nakuru_msg_list.append(nkc.Face(id=component.face_id))
elif type(component) is mirai.At:
nakuru_msg_list.append(nkc.At(qq=component.target))
elif type(component) is mirai.AtAll:
nakuru_msg_list.append(nkc.AtAll())
elif type(component) is mirai.Voice:
2023-04-24 15:55:21 +08:00
if component.url is not None:
nakuru_msg_list.append(nkc.Record.fromURL(component.url))
elif component.path is not None:
nakuru_msg_list.append(nkc.Record.fromFileSystem(component.path))
2023-04-24 12:46:33 +08:00
elif type(component) is Forward:
# 转发消息
yiri_forward_node_list = component.node_list
nakuru_forward_node_list = []
# 遍历并转换
for yiri_forward_node in yiri_forward_node_list:
try:
content_list = NakuruProjectMessageConverter.yiri2target(yiri_forward_node.message_chain)
nakuru_forward_node = nkc.Node(
name=yiri_forward_node.sender_name,
uin=yiri_forward_node.sender_id,
time=int(yiri_forward_node.time.timestamp()) if yiri_forward_node.time is not None else None,
content=content_list
)
nakuru_forward_node_list.append(nakuru_forward_node)
except Exception as e:
import traceback
traceback.print_exc()
nakuru_msg_list.append(nakuru_forward_node_list)
2023-04-23 23:40:08 +08:00
else:
2023-04-24 12:46:33 +08:00
nakuru_msg_list.append(nkc.Plain(str(component)))
2023-04-23 23:40:08 +08:00
return nakuru_msg_list
2023-04-23 15:58:37 +08:00
@staticmethod
2023-04-24 10:57:43 +08:00
def target2yiri(message_chain: typing.Any, message_id: int = -1) -> mirai.MessageChain:
2023-04-24 10:34:51 +08:00
"""将Yiri的消息链转换为YiriMirai的消息链"""
2023-04-23 23:40:08 +08:00
assert type(message_chain) is list
yiri_msg_list = []
2023-04-24 10:57:43 +08:00
import datetime
2023-04-24 11:21:51 +08:00
# 添加Source组件以标记message_id等信息
2023-04-24 10:57:43 +08:00
yiri_msg_list.append(mirai.models.message.Source(id=message_id, time=datetime.datetime.now()))
2023-04-23 23:40:08 +08:00
for component in message_chain:
if type(component) is nkc.Plain:
yiri_msg_list.append(mirai.Plain(text=component.text))
elif type(component) is nkc.Image:
yiri_msg_list.append(mirai.Image(url=component.url))
elif type(component) is nkc.Face:
yiri_msg_list.append(mirai.Face(face_id=component.id))
elif type(component) is nkc.At:
yiri_msg_list.append(mirai.At(target=component.qq))
elif type(component) is nkc.AtAll:
yiri_msg_list.append(mirai.AtAll())
else:
pass
logging.debug("转换后的消息链: " + str(yiri_msg_list))
2023-04-24 10:57:43 +08:00
chain = mirai.MessageChain(yiri_msg_list)
return chain
2023-04-23 15:58:37 +08:00
class NakuruProjectEventConverter(EventConverter):
2023-04-24 10:34:51 +08:00
"""事件转换器"""
2023-04-23 15:58:37 +08:00
@staticmethod
def yiri2target(event: typing.Type[mirai.Event]):
if event is mirai.GroupMessage:
2023-04-23 23:40:08 +08:00
return nakuru.GroupMessage
2023-04-23 15:58:37 +08:00
elif event is mirai.FriendMessage:
2023-04-23 23:40:08 +08:00
return nakuru.FriendMessage
2023-04-23 15:58:37 +08:00
else:
2023-04-23 23:40:08 +08:00
raise Exception("未支持转换的事件类型: " + str(event))
2023-04-23 15:58:37 +08:00
@staticmethod
def target2yiri(event: typing.Any) -> mirai.Event:
2023-04-24 10:57:43 +08:00
yiri_chain = NakuruProjectMessageConverter.target2yiri(event.message, event.message_id)
2023-04-24 11:21:51 +08:00
if type(event) is nakuru.FriendMessage: # 私聊消息事件
2023-04-23 23:40:08 +08:00
return mirai.FriendMessage(
sender=mirai.models.entities.Friend(
id=event.sender.user_id,
nickname=event.sender.nickname,
remark=event.sender.nickname
),
2023-04-24 10:57:43 +08:00
message_chain=yiri_chain,
2023-04-23 23:40:08 +08:00
time=event.time
)
2023-04-24 11:21:51 +08:00
elif type(event) is nakuru.GroupMessage: # 群聊消息事件
2023-04-23 23:40:08 +08:00
permission = "MEMBER"
if event.sender.role == "admin":
permission = "ADMINISTRATOR"
elif event.sender.role == "owner":
permission = "OWNER"
import mirai.models.entities as entities
return mirai.GroupMessage(
sender=mirai.models.entities.GroupMember(
id=event.sender.user_id,
member_name=event.sender.nickname,
permission=permission,
group=mirai.models.entities.Group(
id=event.group_id,
name=event.sender.nickname,
permission=entities.Permission.Member
),
special_title=event.sender.title,
join_timestamp=0,
last_speak_timestamp=0,
mute_time_remaining=0,
),
2023-04-24 10:57:43 +08:00
message_chain=yiri_chain,
2023-04-23 23:40:08 +08:00
time=event.time
)
else:
raise Exception("未支持转换的事件类型: " + str(event))
2023-04-23 15:58:37 +08:00
class NakuruProjectAdapter(MessageSourceAdapter):
"""nakuru-project适配器"""
bot: nakuru.CQHTTP
2023-04-24 10:34:51 +08:00
bot_account_id: int
2023-04-23 15:58:37 +08:00
2023-04-23 23:40:08 +08:00
message_converter: NakuruProjectMessageConverter = NakuruProjectMessageConverter()
event_converter: NakuruProjectEventConverter = NakuruProjectEventConverter()
listener_list: list[dict]
2023-04-23 15:58:37 +08:00
2023-04-24 10:34:51 +08:00
def __init__(self, cfg: dict):
2023-04-23 15:58:37 +08:00
"""初始化nakuru-project的对象"""
2023-04-24 10:34:51 +08:00
self.bot = nakuru.CQHTTP(**cfg)
2023-04-23 23:40:08 +08:00
self.listener_list = []
2023-04-24 10:34:51 +08:00
# nakuru库有bug这个接口没法带access_token会失败
# 所以目前自行发请求
import config
import requests
resp = requests.get(
url="http://{}:{}/get_login_info".format(config.nakuru_config['host'], config.nakuru_config['http_port']),
headers={
'Authorization': "Bearer " + config.nakuru_config['token'] if 'token' in config.nakuru_config else ""
},
timeout=5
)
self.bot_account_id = int(resp.json()['data']['user_id'])
2023-04-23 15:58:37 +08:00
def send_message(
self,
target_type: str,
target_id: str,
2023-04-24 12:46:33 +08:00
message: typing.Union[mirai.MessageChain, list],
converted: bool = False
2023-04-23 15:58:37 +08:00
):
task = None
2023-04-24 12:46:33 +08:00
converted_msg = self.message_converter.yiri2target(message) if not converted else message
# 检查是否有转发消息
has_forward = False
for msg in converted_msg:
if type(msg) is list: # 转发消息,仅回复此消息组件
has_forward = True
converted_msg = msg
break
if has_forward:
if target_type == "group":
task = self.bot.sendGroupForwardMessage(int(target_id), converted_msg)
elif target_type == "person":
task = self.bot.sendPrivateForwardMessage(int(target_id), converted_msg)
else:
raise Exception("Unknown target type: " + target_type)
2023-04-23 15:58:37 +08:00
else:
2023-04-24 12:46:33 +08:00
if target_type == "group":
task = self.bot.sendGroupMessage(int(target_id), converted_msg)
elif target_type == "person":
task = self.bot.sendFriendMessage(int(target_id), converted_msg)
else:
raise Exception("Unknown target type: " + target_type)
2023-04-23 15:58:37 +08:00
asyncio.run(task)
def reply_message(
self,
message_source: mirai.MessageEvent,
message: mirai.MessageChain,
quote_origin: bool = False
):
2023-04-24 10:57:43 +08:00
message = self.message_converter.yiri2target(message)
if quote_origin:
# 在前方添加引用组件
message.insert(0, nkc.Reply(
id=message_source.message_chain.message_id,
)
)
2023-04-23 23:40:08 +08:00
if type(message_source) is mirai.GroupMessage:
2023-04-24 12:46:33 +08:00
self.send_message(
"group",
2023-04-23 23:40:08 +08:00
message_source.sender.group.id,
2023-04-24 10:57:43 +08:00
message,
2023-04-24 12:46:33 +08:00
converted=True
2023-04-23 23:40:08 +08:00
)
elif type(message_source) is mirai.FriendMessage:
2023-04-24 12:46:33 +08:00
self.send_message(
"person",
2023-04-23 23:40:08 +08:00
message_source.sender.id,
2023-04-24 10:57:43 +08:00
message,
2023-04-24 12:46:33 +08:00
converted=True
2023-04-23 23:40:08 +08:00
)
else:
raise Exception("Unknown message source type: " + str(type(message_source)))
2023-04-23 15:58:37 +08:00
def is_muted(self, group_id: int) -> bool:
2023-04-24 10:34:51 +08:00
import time
2023-04-24 11:21:51 +08:00
# 检查是否被禁言
2023-04-24 10:34:51 +08:00
group_member_info = asyncio.run(self.bot.getGroupMemberInfo(group_id, self.bot_account_id))
return group_member_info.shut_up_timestamp > int(time.time())
2023-04-23 15:58:37 +08:00
def register_listener(
self,
event_type: typing.Type[mirai.Event],
callback: typing.Callable[[mirai.Event], None]
):
2023-04-23 23:40:08 +08:00
try:
logging.debug("注册监听器: " + str(event_type) + " -> " + str(callback))
2023-04-24 11:21:51 +08:00
# 包装函数
2023-04-23 23:40:08 +08:00
async def listener_wrapper(app: nakuru.CQHTTP, source: self.event_converter.yiri2target(event_type)):
callback(self.event_converter.target2yiri(source))
2023-04-24 11:21:51 +08:00
# 将包装函数和原函数的对应关系存入列表
2023-04-23 23:40:08 +08:00
self.listener_list.append(
{
"event_type": event_type,
"callable": callback,
"wrapper": listener_wrapper,
}
)
2023-04-24 11:21:51 +08:00
# 注册监听器
2023-04-23 23:40:08 +08:00
self.bot.receiver(self.event_converter.yiri2target(event_type).__name__)(listener_wrapper)
logging.debug("注册完成")
except Exception as e:
traceback.print_exc()
raise e
2023-04-23 15:58:37 +08:00
def unregister_listener(
self,
event_type: typing.Type[mirai.Event],
callback: typing.Callable[[mirai.Event], None]
):
2023-04-24 11:21:51 +08:00
nakuru_event_name = self.event_converter.yiri2target(event_type).__name__
2023-04-23 23:40:08 +08:00
new_event_list = []
# 从本对象的监听器列表中查找并删除
target_wrapper = None
for listener in self.listener_list:
if listener["event_type"] == event_type and listener["callable"] == callback:
target_wrapper = listener["wrapper"]
self.listener_list.remove(listener)
break
if target_wrapper is None:
raise Exception("未找到对应的监听器")
for func in self.bot.event[nakuru_event_name]:
if func.callable != target_wrapper:
new_event_list.append(func)
self.bot.event[nakuru_event_name] = new_event_list
2023-04-23 15:58:37 +08:00
def run_sync(self):
2023-04-23 23:40:08 +08:00
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
2023-04-23 15:58:37 +08:00
self.bot.run()
def kill(self) -> bool:
return False