QQ机器人开发全攻略(基于NoneBot框架)
if session.event['group_id'] == 1121159492:# 指定转发源群。"ws_reverse_servers": [{// 反向WebSocket配置。"password": "z147258951.",// 密码。logging.error(f"消息发送失败: {str(e)}")@on_command('huoqu', aliases=('获取',))awai
·
一、项目架构
复制
├── bot.py # 主入口文件 ├── config.py # 基础配置 ├── config.json # 详细配置(含账号信息) ├── device.json # 设备模拟信息 ├── ceshi.py # 核心功能实现 ├── qqun.py # 群成员管理 └── qq.py # WebSocket消息转发
二、核心功能实现
1. 基础配置(config.py)
python
复制
from nonebot.default_config import *
HOST = '0.0.0.0'
PORT = 8080
DEBUG = False
SUPERUSERS = {228703575} # 管理员QQ号
COMMAND_START = {'', '/', '!', '/', '!'}
2. 消息监听与处理(ceshi.py)
python
复制
@on_natural_language(only_to_me=False)
async def handle_message(session: NLPSession):
msg = str(session.ctx["message"])
# 情话自动回复
if "情话" in msg:
async with aiohttp.ClientSession() as sess:
async with sess.get('https://chp.shadiao.app/api.php') as res:
await session.send(await res.text())
# 淘口令转换
if re.search(r'[a-zA-Z0-9@¥$&¢\(\)]{8,18}', msg):
await convert_taokouling(session, msg)
async def convert_taokouling(session, msg):
"""淘口令高佣转换"""
params = {
'appkey': 'b5d1a0c33d18410aaa412c792e4afa3f',
'sid': '29513',
'pid': 'mm_124020275_27934084_107154819',
'tkl': msg,
'signurl': 5
}
async with aiohttp.ClientSession() as sess:
async with sess.post('https://api.zhetaoke.com:10001/api/open_gaoyongzhuanlian_tkl.ashx',
data=params) as res:
result = await res.json()
if result['status'] == 200:
new_tkl = result['content'][0]['tkl']
await forward_to_groups(session, msg.replace(old_tkl, new_tkl))
3. 群成员管理(qqun.py)
python
复制
@on_command('huoqu', aliases=('获取',))
async def get_group_members(session: CommandSession):
"""导出群成员列表"""
members = await session.bot.get_group_member_list(
group_id=609896480 # 替换为目标群号
)
with open('群成员.txt', 'w', encoding='utf-8') as f:
for member in members:
f.write(f"{member['user_id']}\n")
await session.send("群成员导出完成!")
三、高级功能实现
1. 跨平台消息转发(qq.py)
python
复制
@on_natural_language(only_to_me=False)
async def forward_to_websocket(session: NLPSession):
"""QQ消息转发到WebSocket"""
if session.event['group_id'] == 1121159492: # 指定转发源群
ws = create_connection("ws://127.0.0.1:8081/")
ws.send(json.dumps({
"token": "zqllmt",
"api": "SendTextMsg",
"msg": str(session.ctx["message"])
}))
2. 配置文件详解(config.json)
json
复制
{
"uin": 228895590, // 机器人QQ号
"password": "z147258951.", // 密码
"http_config": { // HTTP服务配置
"host": "0.0.0.0",
"port": 5700
},
"ws_reverse_servers": [{ // 反向WebSocket配置
"reverse_url": "ws://0.0.0.0:8080/ws/"
}]
}
四、部署指南
1. 环境准备
bash
复制
pip install nonebot aiocqhttp aiohttp requests websocket-client
2. 启动机器人
bash
复制
python bot.py
3. 常用指令
| 指令示例 | 功能说明 |
|---|---|
| /获取 | 导出群成员列表 |
| 发送"情话" | 获取随机情话 |
| 发送淘口令 | 自动转换高佣链接 |
五、技术要点
-
消息处理流程
mermaid
复制
sequenceDiagram 用户->>QQ机器人: 发送消息 QQ机器人->>NoneBot: 触发事件处理 NoneBot->>自定义逻辑: 匹配处理规则 自定义逻辑-->>QQ机器人: 返回响应内容 QQ机器人->>用户: 回复消息 -
异常处理建议
python
复制
async def safe_send(session, message): try: await session.send(message) except Exception as e: logging.error(f"消息发送失败: {str(e)}")
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)