Qwen Agent实战教程-上
# 第一章:环境准备与安装
# 1.1 系统要求检查
首先检查你的系统环境:
# 检查 Python 版本(必须 >= 3.10)
python --version
# 如果版本低于 3.10,请先升级 Python
# Windows: 从 python.org 下载安装
# macOS: brew install python
# Linux: sudo apt-get install python3.10
2
3
4
5
6
7
# 1.2 创建虚拟环境(强烈推荐)
# 创建虚拟环境
python -m venv qwen_agent_env
# 激活虚拟环境
# Windows:
qwen_agent_env\Scripts\activate
# macOS/Linux:
source qwen_agent_env/bin/activate
# 验证激活成功(命令行前面会有 (qwen_agent_env) 标记)
2
3
4
5
6
7
8
9
10
11
# 1.3 升级 pip
pip install --upgrade pip
# 1.4 安装 Qwen-Agent
方式一:完整安装(推荐)
pip install -U "qwen-agent[rag,code_interpreter,gui,mcp]"
这个命令会安装:
qwen-agent:核心框架
rag:检索增强生成支持
code_interpreter:代码执行功能
gui:Gradio 网页界面
mcp:模型上下文协议支持
方式二:最小化安装
pip install -U qwen-agent
如果你只需要基础功能,可以用这个命令。
方式三:从源码安装(开发版)
# 克隆仓库
git clone https://github.com/QwenLM/Qwen-Agent.git
cd Qwen-Agent
# 安装
pip install -e ./"[gui,rag,code_interpreter,mcp]"
2
3
4
5
6
# 1.5 验证安装
# 检查是否安装成功
python -c "import qwen_agent; print(f'Qwen-Agent 版本: {qwen_agent.__version__}')"
# 测试导入
python -c "from qwen_agent.agents import Assistant; print('导入成功')"
2
3
4
5
如果看到版本号和"导入成功",说明安装完成。
# 1.6 安装代码解释器依赖(可选)
如果你想使用代码解释器功能,需要安装 Docker:
# 检查 Docker 是否安装
docker --version
# 如果没有安装,请访问 https://docs.docker.com/get-docker/ 下载安装
2
3
4
# 第二章:第一个 Agent
# 2.1 获取 API Key
使用阿里云 DashScope(推荐新手)
访问阿里云 DashScope:https://dashscope.aliyun.com/
注册/登录账号
进入控制台,创建 API Key
保存你的 API Key
# 2.2 配置环境变量
Linux/macOS
# 在 ~/.bashrc 或 ~/.zshrc 中添加
export DASHSCOPE_API_KEY="your-api-key-here"
# 重新加载配置
source ~/.bashrc # 或 source ~/.zshrc
# 验证
echo $DASHSCOPE_API_KEY
2
3
4
5
6
7
8
Windows
# 在 CMD 中设置
set DASHSCOPE_API_KEY=your-api-key-here
# 在 PowerShell 中设置
$env:DASHSCOPE_API_KEY="your-api-key-here"
# 或者设置环境变量(永久)
# 系统属性 -> 高级 -> 环境变量 -> 新建
# 变量名:DASHSCOPE_API_KEY
# 变量值:your-api-key-here
2
3
4
5
6
7
8
9
10
# 2.3 创建第一个 Agent
创建文件 first_agent.py:
"""
第一个 Qwen Agent 示例
演示如何创建一个基础的对话 Agent
"""
from qwen_agent.agents import Assistant
# 配置模型
llm_cfg = {
# 使用 DashScope 提供的模型服务
'model': 'qwen-max-latest', # 模型名称
'model_type': 'qwen_dashscope', # 模型服务类型
# 'api_key': 'YOUR_DASHSCOPE_API_KEY', # 如果没有设置环境变量,可以在这里填入
'generate_cfg': { # 生成参数
'top_p': 0.8, # 核采样参数
'temperature': 0.7, # 温度参数
}
}
# 创建 Agent
bot = Assistant(
llm=llm_cfg,
system_message='你是一个友好的AI助手,用简洁明了的语言回答问题。'
)
# 运行对话
def chat():
messages = [] # 存储对话历史
print("=== Qwen Agent 对话系统 ===")
print("输入 'quit' 或 'exit' 退出")
print("=" * 50)
while True:
# 获取用户输入
user_input = input("\n你: ")
# 检查退出命令
if user_input.lower() in ['quit', 'exit', '退出']:
print("再见!")
break
# 添加用户消息到历史
messages.append({'role': 'user', 'content': user_input})
# 获取 Agent 回复
print("\nAgent: ", end='', flush=True)
full_response = ""
last_len = 0
try:
# 直接流式调用 LLM
for partial in llm.chat(messages=messages, stream=True):
current = partial[-1]['content']
if len(current) > last_len:
print(current[last_len:], end='', flush=True)
last_len = len(current)
full_response = current
except Exception as e:
print(f"\n[错误] {e}")
full_response = "抱歉,我无法回答这个问题。"
print()
messages.append({'role': 'assistant', 'content': full_response})
if __name__ == '__main__':
chat()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 2.4 运行第一个 Agent
python first_agent.py
你会看到:
=== Qwen Agent 对话系统 ===
输入 'quit' 或 'exit' 退出
==================================================
你: 你好
Agent: 你好!有什么我可以帮助你的吗?
你: 什么是人工智能?
Agent: 人工智能(AI)是指由计算机系统所表现出来的智能...
你: quit
再见!
2
3
4
5
6
7
8
9
10
11
12
# 2.5 理解代码结构
让我详细解释每个部分:
llm_cfg 配置详解
llm_cfg = {
# 模型选择
'model': 'qwen-max-latest', # 最新最强模型
# 可选模型:
# - 'qwen-max-latest': 最强模型
# - 'qwen-plus-latest': 平衡性能和速度
# - 'qwen-turbo-latest': 最快响应
# 服务类型
'model_type': 'qwen_dashscope', # 使用阿里云服务
# 可选类型:
# - 'qwen_dashscope': 阿里云 DashScope
# - 'openai': OpenAI 兼容接口(用于本地模型)
# API Key(可选)
# 'api_key': 'sk-xxx', # 如果没设置环境变量
# 生成参数
'generate_cfg': {
'top_p': 0.8, # 核采样(0-1)
'temperature': 0.7, # 温度(0-1,越高越随机)
'max_tokens': 2000, # 最大生成token数
'repetition_penalty': 1.1, # 重复惩罚
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Assistant 类详解
bot = Assistant(
llm=llm_cfg, # 模型配置(必需)
system_message='...', # 系统提示词(可选)
name='MyAssistant', # Agent 名称(可选)
description='我的助手', # Agent 描述(可选)
function_list=[], # 可用工具列表(可选)
files=[], # 参考文件列表(可选)
lm_cache_enabled=True, # 启用缓存(可选)
)
2
3
4
5
6
7
8
9
消息格式
messages = [
{'role': 'user', 'content': '你好'}, # 用户消息
{'role': 'assistant', 'content': '你好!'}, # 助手消息
{'role': 'system', 'content': '你是一个...'} # 系统消息(可选)
]
2
3
4
5
# 第三章:Web UI 详解
# 3.1 创建带 Web UI 的 Agent
创建文件 web_ui_agent.py:
"""
带 Web UI 的 Agent
演示如何启动 Gradio 网页界面
"""
from qwen_agent.agents import Assistant
from qwen_agent.gui import WebUI
# 配置模型
llm_cfg = {
'model': 'qwen-max-latest',
'model_type': 'qwen_dashscope',
'generate_cfg': {
'top_p': 0.8,
'temperature': 0.7,
}
}
# 创建 Agent
bot = Assistant(
llm=llm_cfg,
system_message='你是一个智能助手,可以帮助用户解决各种问题。'
)
# 启动 Web UI
if __name__ == '__main__':
# 创建 WebUI 实例
ui = WebUI(bot)
# 运行(默认端口 7860)
ui.run(
server_name='0.0.0.0', # 监听地址(0.0.0.0 表示所有网卡)
server_port=7860, # 端口号
share=False, # 是否创建公开链接(True 会生成公网链接)
inbrowser=True # 自动打开浏览器
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 3.2 运行 Web UI
python web_ui_agent.py
浏览器会自动打开 http://localhost:7860
# 3.3 Web UI 功能详解
# 3.3.1 对话区域
输入框:输入你的问题
发送按钮:发送消息
清空对话:清除所有对话历史
导出对话:导出对话记录
# 3.3.2 文件上传
Web UI 支持上传多种文件类型:
# 创建支持文件上传的 Agent
bot = Assistant(
llm=llm_cfg,
system_message='你可以分析上传的文件并回答问题。'
)
2
3
4
5
支持的文件格式:
文本文件:.txt, .md, .csv, .json
文档文件:.pdf, .docx, .pptx
图片文件:.jpg, .png, .gif
# 3.3.3 工具调用监控
当 Agent 使用工具时,UI 会显示:
🔧 使用工具: code_interpreter
📝 执行代码:
import pandas as pd
data = pd.read_csv('data.csv')
print(data.head())
✅ 执行结果:
id name age
0 1 张三 25
1 2 李四 30
2 3 王五 28
💬 回复:
我已经读取了数据,共有3条记录...
2
3
4
5
6
7
8
9
10
11
12
13
14
# 第四章:自定义工具开发
# 4.1 理解工具机制
Qwen Agent 的工具系统基于 Function Calling,允许 Agent 调用外部功能。
工具的基本结构:
from qwen_agent.tools.base import BaseTool, register_tool
@register_tool('tool_name') # 注册工具
class MyTool(BaseTool):
# 工具描述(告诉 Agent 这个工具能做什么)
description = '工具的功能描述'
# 参数定义(告诉 Agent 需要什么参数)
parameters = [{
'name': 'param1',
'type': 'string',
'description': '参数1的说明',
'required': True # 是否必需
}]
# 工具执行逻辑
def call(self, params: str, **kwargs) -> str:
# 解析参数
import json5
args = json5.loads(params)
# 执行逻辑
result = self.do_something(args)
# 返回结果(必须是 JSON 字符串)
return json5.dumps(result, ensure_ascii=False)
def do_something(self, args):
# 实际的业务逻辑
return {'status': 'success', 'data': '...'}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 4.2 创建简单的工具
# 4.2.1 计算器工具
创建文件 calculator_tool.py:
"""
计算器工具
演示如何创建一个简单的数学计算工具
"""
from qwen_agent.tools.base import BaseTool, register_tool
import json5
import operator
@register_tool('calculator')
class Calculator(BaseTool):
"""
数学计算工具
支持加减乘除等基本运算
"""
description = '执行数学计算,支持加减乘除、幂运算、取模等操作'
parameters = [{
'name': 'expression',
'type': 'string',
'description': '数学表达式,例如: 2 + 3 * 4, (10 - 5) / 2, 2 ** 8',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
"""执行计算"""
try:
# 解析参数
# 因为 Qwen 系列模型(尤其是早期版本)在生成 function call 参数时,经常输出“不严格符合 JSON 标准”的文本,所以要用json5
args = json5.loads(params)
expression = args['expression']
# 安全性检查
# 因为 set 的成员检查(in 操作)速度极快,且支持集合运算(如 issubset)。
allowed_chars = set('0123456789+-*/().*%')
# 判断“用户输入的所有字符”是否全部包含在允许的字符集中
if not set(expression).issubset(allowed_chars):
raise ValueError('表达式包含非法字符')
# 执行计算
result = eval(expression)
# 返回结果
return json5.dumps({
'success': True,
'expression': expression,
'result': result,
'message': f'{expression} = {result}'
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'success': False,
'error': str(e),
'message': f'计算失败: {str(e)}'
}, ensure_ascii=False)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 4.2.2 使用计算器工具
创建文件 test_calculator.py:
"""
测试计算器工具
"""
from qwen_agent.agents import Assistant
from calculator_tool import Calculator # 导入自定义工具
# 配置模型
llm_cfg = {
'model': 'qwen-max-latest',
'model_type': 'qwen_dashscope',
'generate_cfg': {'top_p': 0.8}
}
# 创建带工具的 Agent
bot = Assistant(
llm=llm_cfg,
system_message='你是一个数学助手,可以使用计算器工具进行数学计算。',
function_list=['calculator'] # 添加自定义工具
)
# 测试对话
def test():
messages = []
# 测试1:简单计算
print("\n=== 测试1: 简单计算 ===")
messages.append({'role': 'user', 'content': '计算 123 + 456'})
for response in bot.run(messages=messages):
if response[-1]['role'] == 'assistant':
print(f"Agent: {response[-1]['content']}")
# 测试2:复杂表达式
print("\n=== 测试2: 复杂表达式 ===")
messages.append({'role': 'user', 'content': '计算 (10 + 20) * 3 - 15 / 5'})
for response in bot.run(messages=messages):
if response[-1]['role'] == 'assistant':
print(f"Agent: {response[-1]['content']}")
# 测试3:幂运算
print("\n=== 测试3: 幂运算 ===")
messages.append({'role': 'user', 'content': '计算 2 的 10 次方'})
for response in bot.run(messages=messages):
if response[-1]['role'] == 'assistant':
print(f"Agent: {response[-1]['content']}")
if __name__ == '__main__':
test()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
运行测试:
python test_calculator.py
# 4.3 创建实用的工具
# 4.3.1 天气查询工具
创建文件 weather_tool.py:
"""
天气查询工具
使用免费天气 API 查询天气信息
"""
from qwen_agent.tools.base import BaseTool, register_tool
import json5
import requests
@register_tool('weather_query')
class WeatherQuery(BaseTool):
# 天气查询工具
# 查询指定城市的当前天气
description = '查询指定成视的当前天气状况,包括温度,湿度,风速等信息'
parameters = [{
'name': 'city',
'type': 'string',
'description': '城市名称,例如:北京,上海,广州',
'required': True
}]
def call(self, params:str, **kwargs) -> str:
# 查询天气
try:
args = json5.loads(params)
city = args['city']
# 调用高德地图API查询天气
gaode_api_key = "681b65a915ffbcb806f8ec482b2fa638"
base_urk = "https://restapi.amap.com/v3/weather/weatherInfo"
params = {
"key": gaode_api_key,
"city": city,
"extensions": "base" # base:返回实况天气, all:返回预报天气
}
weather_data = ''
response = requests.get(base_urk, params=params)
if response.status_code == 200:
weather_data = response.json()
else:
raise Exception(f"获取天气失败")
return json5.dumps({
'success': True,
'city': city,
'weather_data': weather_data,
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'success': False,
'error': str(e),
'message': f'查询天气失败'
}, ensure_ascii=False)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 第五章:数据库集成
# 5.1 SQLite 数据库工具
创建文件 sqlite_tool.py:
"""
SQLite 数据库工具
支持执行 SQL 查询和操作
"""
from qwen_agent.tools.base import BaseTool, register_tool
import json5
import sqlite3
from typing import List, Dict, Any
@register_tool('sqlite_query')
class SQLiteQuery(BaseTool):
"""
SQLite 查询工具
执行 SQL 查询并返回结果
"""
description = '执行 SQLite 数据库查询语句,返回查询结果'
parameters = [{
'name': 'db_path',
'type': 'string',
'description': '数据库文件路径,例如:./data.db',
'required': True
}, {
'name': 'sql',
'type': 'string',
'description': 'SQL 查询语句,例如:SELECT * FROM users WHERE age > 18',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
"""执行查询"""
try:
args = json5.loads(params)
db_path = args['db_path']
sql = args['sql'].strip()
# 安全检查:只允许 SELECT 语句
if not sql.upper().startswith('SELECT'):
raise ValueError('只允许执行 SELECT 查询语句')
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 执行查询
cursor.execute(sql)
# 获取列名
columns = [description[0] for description in cursor.description]
# 获取数据
rows = cursor.fetchall()
# 关闭连接
conn.close()
# 格式化结果
result = {
'columns': columns,
'rows': [list(row) for row in rows],
'row_count': len(rows)
}
return json5.dumps({
'success': True,
'sql': sql,
'result': result,
'message': f'查询成功,共返回 {len(rows)} 条记录'
}, ensure_ascii=False)
except sqlite3.Error as e:
return json5.dumps({
'success': False,
'error': '数据库错误',
'details': str(e)
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'success': False,
'error': '执行失败',
'details': str(e)
}, ensure_ascii=False)
@register_tool('sqlite_execute')
class SQLiteExecute(BaseTool):
"""
SQLite 执行工具
执行 INSERT、UPDATE、DELETE 等操作
"""
description = '执行 SQLite 数据库操作语句(INSERT、UPDATE、DELETE 等),返回受影响的行数'
parameters = [{
'name': 'db_path',
'type': 'string',
'description': '数据库文件路径',
'required': True
}, {
'name': 'sql',
'type': 'string',
'description': 'SQL 操作语句,例如:INSERT INTO users (name, age) VALUES ("张三", 25)',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
"""执行操作"""
try:
args = json5.loads(params)
db_path = args['db_path']
sql = args['sql'].strip()
# 安全检查:禁止 DROP、TRUNCATE 等危险操作
dangerous_keywords = ['DROP', 'TRUNCATE', 'ALTER', 'CREATE']
if any(keyword in sql.upper() for keyword in dangerous_keywords):
raise ValueError('禁止执行危险操作(DROP、TRUNCATE、ALTER、CREATE)')
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 执行操作
cursor.execute(sql)
affected_rows = cursor.rowcount
conn.commit()
# 关闭连接
conn.close()
return json5.dumps({
'success': True,
'sql': sql,
'affected_rows': affected_rows,
'message': f'操作成功,影响了 {affected_rows} 行'
}, ensure_ascii=False)
except sqlite3.Error as e:
return json5.dumps({
'success': False,
'error': '数据库错误',
'details': str(e)
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'success': False,
'error': '执行失败',
'details': str(e)
}, ensure_ascii=False)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# 5.2 创建测试数据库
创建文件 create_test_db.py:
"""
创建测试数据库
"""
import sqlite3
def create_database():
"""创建测试数据库"""
# 连接数据库(如果不存在会自动创建)
conn = sqlite3.connect('test_database.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
email TEXT,
city TEXT,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 插入测试数据
test_data = [
('张三', 25, 'zhangsan@example.com', '北京'),
('李四', 30, 'lisi@example.com', '上海'),
('王五', 28, 'wangwu@example.com', '广州'),
('赵六', 35, 'zhaoliu@example.com', '深圳'),
('钱七', 22, 'qianqi@example.com', '杭州'),
]
cursor.executemany(
'INSERT INTO users (name, age, email, city) VALUES (?, ?, ?, ?)',
test_data
)
# 提交事务
conn.commit()
# 验证数据
cursor.execute('SELECT COUNT(*) FROM users')
count = cursor.fetchone()[0]
print(f'数据库创建成功!')
print(f'表名:users')
print(f'记录数:{count}')
# 关闭连接
conn.close()
if __name__ == '__main__':
create_database()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
运行创建数据库:
python create_test_db.py
# 5.3 使用数据库工具
创建文件 test_sqlite_tool.py:
"""
测试 SQLite 数据库工具
"""
from qwen_agent.agents import Assistant
from sqlite_tool import SQLiteQuery, SQLiteExecute
# 配置模型
llm_cfg = {
'model': 'qwen-max-latest',
'model_type': 'qwen_dashscope',
'generate_cfg': {'top_p': 0.8}
}
# 创建 Agent
bot = Assistant(
llm=llm_cfg,
system_message='你是一个数据库助手,可以帮助用户查询和操作 SQLite 数据库。',
function_list=['sqlite_query', 'sqlite_execute']
)
# 测试函数
def test():
messages = []
# 测试1:查询所有用户
print("\n=== 测试1: 查询所有用户 ===")
messages.append({
'role': 'user',
'content': '查询 test_database.db 数据库中 users 表的所有数据'
})
for response in bot.run(messages=messages):
if response[-1]['role'] == 'assistant':
print(f"Agent: {response[-1]['content']}")
# 测试2:条件查询
print("\n=== 测试2: 条件查询 ===")
messages.append({
'role': 'user',
'content': '查询年龄大于25岁的用户'
})
for response in bot.run(messages=messages):
if response[-1]['role'] == 'assistant':
print(f"Agent: {response[-1]['content']}")
# 测试3:插入数据
print("\n=== 测试3: 插入数据 ===")
messages.append({
'role': 'user',
'content': '向 users 表插入一条新记录:姓名=孙八,年龄=40,邮箱=sunba@example.com,城市=成都'
})
for response in bot.run(messages=messages):
if response[-1]['role'] == 'assistant':
print(f"Agent: {response[-1]['content']}")
# 测试4:统计查询
print("\n=== 测试4: 统计查询 ===")
messages.append({
'role': 'user',
'content': '统计每个城市的用户数量'
})
for response in bot.run(messages=messages):
if response[-1]['role'] == 'assistant':
print(f"Agent: {response[-1]['content']}")
if __name__ == '__main__':
test()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# 5.4 MySQL 数据库工具
创建文件 mysql_tool.py:
"""
MySQL 数据库工具
支持连接和操作 MySQL 数据库
"""
from qwen_agent.tools.base import BaseTool, register_tool
import json5
# 注意:需要先安装 mysql-connector-python
# pip install mysql-connector-python
try:
import mysql.connector
from mysql.connector import Error
except ImportError:
print("请先安装 mysql-connector-python: pip install mysql-connector-python")
raise
@register_tool('mysql_query')
class MySQLQuery(BaseTool):
"""
MySQL 查询工具
执行 MySQL 查询并返回结果
"""
description = '执行 MySQL 数据库查询语句,返回查询结果'
parameters = [{
'name': 'host',
'type': 'string',
'description': '数据库主机地址,例如:localhost',
'required': True
}, {
'name': 'port',
'type': 'integer',
'description': '数据库端口,默认:3306',
'required': False
}, {
'name': 'database',
'type': 'string',
'description': '数据库名称',
'required': True
}, {
'name': 'user',
'type': 'string',
'description': '数据库用户名',
'required': True
}, {
'name': 'password',
'type': 'string',
'description': '数据库密码',
'required': True
}, {
'name': 'sql',
'type': 'string',
'description': 'SQL 查询语句',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
"""执行查询"""
try:
args = json5.loads(params)
# 获取连接参数
config = {
'host': args.get('host', 'localhost'),
'port': args.get('port', 3306),
'database': args['database'],
'user': args['user'],
'password': args['password'],
'charset': 'utf8mb4',
}
sql = args['sql'].strip()
# 安全检查
if not sql.upper().startswith('SELECT'):
raise ValueError('只允许执行 SELECT 查询语句')
# 连接数据库
conn = mysql.connector.connect(** config)
cursor = conn.cursor(dictionary=True)
# 执行查询
cursor.execute(sql)
results = cursor.fetchall()
# 关闭连接
cursor.close()
conn.close()
return json5.dumps({
'success': True,
'sql': sql,
'results': results,
'row_count': len(results),
'message': f'查询成功,共返回 {len(results)} 条记录'
}, ensure_ascii=False)
except Error as e:
return json5.dumps({
'success': False,
'error': '数据库连接错误',
'details': str(e)
}, ensure_ascii=False)
except Exception as e:
return json5.dumps({
'success': False,
'error': '执行失败',
'details': str(e)
}, ensure_ascii=False)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 5.5 数据库 Agent 完整示例
创建文件 database_agent.py:
"""
完整的数据库 Agent
演示如何构建一个智能数据库助手
"""
from qwen_agent.agents import Assistant
from sqlite_tool import SQLiteQuery, SQLiteExecute
class DatabaseAgent:
"""数据库 Agent"""
def __init__(self, db_path='test_database.db'):
"""初始化"""
self.db_path = db_path
# 配置模型
self.llm_cfg = {
'model': 'qwen-max-latest',
'model_type': 'qwen_dashscope',
'generate_cfg': {
'top_p': 0.8,
'temperature': 0.3 # 降低随机性,提高准确性
}
}
# 系统提示词
system_message = f'''
当用户问别的问题的时候,你是用户的小助手,但当用户问门票问题的时候你是门票助手,以下是关于门票订单表相关的字段,我可能会编写对应的SQL,对数据进行查询
-- 门票订单表
CREATE TABLE tkt_orders (
order_time DATETIME, -- 订单日期
account_id INT, -- 预定用户ID
gov_id VARCHAR(18), -- 商品使用人ID(身份证号)
gender VARCHAR(10), -- 使用人性别
age INT, -- 年龄
province VARCHAR(30), -- 使用人省份
SKU VARCHAR(100), -- 商品SKU名
product_serial_no VARCHAR(30), -- 商品ID
eco_main_order_id VARCHAR(20), -- 订单ID
sales_channel VARCHAR(20), -- 销售渠道
status VARCHAR(30), -- 商品状态
order_value DECIMAL(10,2), -- 订单金额
quantity INT -- 商品数量
);
一日门票,对应多种SKU:
Universal Studios Beijing One-Day Dated Ticket-Standard
Universal Studios Beijing One-Day Dated Ticket-Child
Universal Studios Beijing One-Day Dated Ticket-Senior
二日门票,对应多种SKU:
USB 1.5-Day Dated Ticket Standard
USB 1.5-Day Dated Ticket Discounted
一日门票、二日门票查询
SUM(CASE WHEN SKU LIKE 'Universal Studios Beijing One-Day%' THEN quantity ELSE 0 END) AS one_day_ticket_sales,
SUM(CASE WHEN SKU LIKE 'USB%' THEN quantity ELSE 0 END) AS two_day_ticket_sales
我将回答用户关于门票相关的问题
'''
# 创建 Agent
self.bot = Assistant(
llm=self.llm_cfg,
system_message=system_message,
function_list=['sqlite_query', 'sqlite_execute']
)
def query(self, user_input: str) -> str:
"""执行查询"""
messages = [{'role': 'user', 'content': user_input}]
response_text = ''
for response in self.bot.run(messages=messages):
response_text = response[-1]['content']
return response_text
def interactive_mode(self):
"""交互模式"""
print("=== 数据库助手 ===")
print(f"数据库: {self.db_path}")
print("输入 'quit' 退出")
print("=" * 50)
while True:
user_input = input("\n查询: ")
if user_input.lower() in ['quit', 'exit', '退出']:
print("再见!")
break
# 执行查询
result = self.query(user_input)
print(f"\n结果:\n{result}")
# 使用示例
if __name__ == '__main__':
# 创建数据库助手
agent = DatabaseAgent('test_database.db')
# 进入交互模式
agent.interactive_mode()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99