Agent智能体系统的设计与应用
# AI Agent
通过LLM,能够自主理解、规划、执行复杂任务的系统。给它一个目标,AI Agents就能完成剩下的全部工作
- 规划(Planning):将任务分解为较小的、可管理的子目标
- 记忆(Memory):短期记忆,进行上下文学习;长期记忆,一般通过外部载体储存和快速检索来实现。
- 工具使用(Tool use):调用外部API,获取额外信息

# AI Agent对比
| 工具 | 核心定位 | 架构特点 | 适用场景 |
|---|---|---|---|
| LangChain | 开源LLM应用开发框架 | 基于链(Chain)的线性或分支工作流,支持Agent模式 | 快速构建RAG、对话系统、工具调用等线性任务 |
| LangGraph | LangChain的扩展,专注于复杂工作流 | 基于图(Graph)的循环和条件逻辑,支持多Agent协作 | 需要循环、动态分支或状态管理的复杂任务(如自适应RAG、多Agent系统) |
| Qwen-Agent | 通义千问的AI Agent框架 | 基于阿里云大模型,支持多模态交互与工具调用 | 开源,集成多种工具,MCP调用 |
| Coze | 字节跳动的无代码AI Bot平台 | 可视化拖拽界面,内置知识库、多模态插件快 | 速部署社交平台机器人、轻量级工作流 |
| Dify | 开源LLM应用开发平台 | API优先,支持Prompt工程与灵活编排 | 开发者定制化LLM应用,需深度集成或私有化部署 |
# Agent:自主智能体的力量
Agent通常被实现为LLM通过工具调用(基于环境反馈)在循环中执行动作的系统。
正如Anthropic指出的,Agent可以处理复杂的任务,但其实现很简单。它们通常是LLM根据环境反馈使用工具的循环。因此,清晰全面的设计工具集以及文档对于Agent的成功至关重要。

什么时候使用Agent?
Agent适用于那些开放性问题,这些问题很难或无法预测所需的步骤数量,并且无法硬编码固定路径。
LLM可能会运行多个回合,因此你需要对其决策能力有一定的信任。
智能体的自主性使其非常适合在受信任的环境中扩展任务。然而,自主性也意味着更高的成本和可能出现的错误累积。
=>建议在沙盒环境中进行广泛的测试,并设置适当的防护栏。
# 打造Best Practice
AI智能体和工作流是互补的,可以集成在一起以实现最佳效果,尤其是在复杂的现实世界应用中。
增强自动化
AI智能体可以自主处理特定任务,而工作流则将这些任务协调成一个连贯、高效的过程。可扩展性
在结构化工作流中结合多个AI智能体,可以使组织高效扩展运营,减少人工工作量,提高生产力。弹性与适应性
虽然单个智能体可以应对局部变化,但工作流可以动态调整整体流程,用来与战略目标保持一致。
在智能制造系统中:
- AI智能体可以监控设备性能、预测维护需求并优化生产计划。
- 工作流则负责原材料采购、生产排序、质量保证和物流,确保从原材料到产品交付的无缝过渡。
选择适合你的系统才是成功的关键:
- 在AI领域,成功并不是关于构建复杂的系统,而是构建最适合需求的系统。
- 从简单的提示开始,只有在简单方案解决步了时,再添加多步智能体系统。
在实现智能体时,有三个核心原则:
- 保持智能体设计的简洁性:避免不必要的复杂性,专注于核心功能。
- 优先考虑透明性:明确展示智能体的规划步骤,让用户清楚了解其决策过程。
- 打造Function/MCP:打造工具,以及说明文档和测试,确保Agent与外部环境的交互。
框架可以帮助你快速上手,但不要害怕在进入生产阶段时减少抽象层,直接使用基础组件。遵循这些原则,你可以创建出不仅强大而且可靠、可维护且值得用户信赖的智能体系统。
# 智能体分类(反应式Reactive)
- 反应式(Reactive)
反应式架构:快速决策的“直觉型”智能体
反应式架构是AI智能体设计中最简单直接的模式。在这种架构中,一个大型语言模型(LLM)首先分析当前情况,确定下一步要采取的行动。然后,在环境中执行该行动,产生观察结果作为反馈。LLM处理这些观察结果重新评估下一步行动,选择另一个行动,并继续这个循环,直到任务完成。

特点:基于当前环境即时决策,无长期规划,依赖预设规则快速响应。
工作原理:
- 感知:获取环境输入(如传感器数据)。
- 决策:LLM或规则系统立即生成响应动作。
- 执行:执行动作并观察结果,循环往复直至任务完成。
优势:
- 速度快:无复杂推理,适合毫秒级响应的场景(如机器人避障、高频交易)。
- 简单可靠:行为由明确规则驱动,易于设计和验证。
局限:
- 缺乏适应性:无法处理未预见的场景或需多步规划的任务。
- 短视性:仅优化当前动作,可能陷入局部循环(如机器人绕圈)。
典型应用:
- 机器人:扫地机器人避障、无人机紧急悬停。
- 游戏NPC:敌人对玩家攻击的即时反应。
- 工业控制:传感器超限时触发警报或停机。
适用场景:任务规则明确、响应需实时,且无需长期策略的简单环境。
“条件反射”——像膝跳反应一样快速直接,但无法应对复杂变化。
# CASE:私募基金运作指引问答助手
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
私募基金运作指引问答助手 - LangGraph实现
"""
import re
from typing import List, Dict, Any, Annotated
# TypedDict 用于定义带类型的字典。普通字典:{"name": "张三", "age": 25} - 类型不明确 TypedDict:明确每个 key 的类型
from typing_extensions import TypedDict
# langchain_community 是 langchain 的社区集成包,包含各种第三方服务的适配器。
# ChatTongyi 是通义千问的聊天模型封装。
# 为什么用 ChatTongyi 而不是 Tongyi?
# 因为 ChatTongyi 支持 bind_tools(绑定工具),而普通的 Tongyi 不支持
from langchain_community.chat_models import ChatTongyi
# @tool 装饰器:把一个普通 Python 函数变成 langchain 工具。
# 函数的 docstring 会自动变成工具的描述
# 参数的类型注解会自动生成工具的输入 schema
from langchain_core.tools import tool
# 消息类型:langchain 把对话分成三种角色。
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
# StateGraph 是 langgraph 的核心——状态图。
# 你可以把 Agent 想象成一个流程图
# 每个节点是一个处理步骤
# 边定义节点之间的流转
# END 是一个特殊标记,表示流程结束。
from langgraph.graph import StateGraph, END
# ToolNode 是 langgraph 预置的工具执行节点。
# 你不需要自己写"如何调用工具、如何解析结果"
# 它自动帮你执行工具,把结果包装成 ToolMessage
from langgraph.prebuilt import ToolNode
# ToolMessage 是工具执行结果的封装。
# AI 调用工具后,工具的返回值需要包装成 ToolMessage
# 这样 AI 才能理解"这是工具给我的反馈"
from langchain_core.messages import ToolMessage
# 通义千问API密钥
DASHSCOPE_API_KEY = 'sk-cd87191136be484d92f1ac53551440f1'
# 定义Agent的状态
# 这是 Agent 的核心数据结构——整个图的所有节点共享这份状态。
# 逐个拆解:
# TypedDict:定义一个带类型的字典
# messages:字段名,存储对话历史
# list:类型是列表
# Annotated[list, "add_messages"]:这个列表有特殊行为
# "add_messages" 是什么?
# 这是 langgraph 内置的 reducer(归约器)。
# 普通 list:新值会覆盖旧值
# 带 add_messages 的 list:新值会追加到旧值后面
class AgentState(TypedDict):
messages: Annotated[list, "add_messages"]
# 私募基金规则数据库
FUND_RULES_DB = [
{
"id": "rule001",
"category": "设立与募集",
"question": "私募基金的合格投资者标准是什么?",
"answer": "合格投资者是指具备相应风险识别能力和风险承担能力,投资于单只私募基金的金额不低于100万元且符合下列条件之一的单位和个人:\n1. 净资产不低于1000万元的单位\n2. 金融资产不低于300万元或者最近三年个人年均收入不低于50万元的个人"
},
{
"id": "rule002",
"category": "设立与募集",
"question": "私募基金的最低募集规模要求是多少?",
"answer": "私募证券投资基金的最低募集规模不得低于人民币1000万元。"
},
{
"id": "rule014",
"category": "监管规定",
"question": "私募基金管理人的风险准备金要求是什么?",
"answer": "私募证券基金管理人应当按照管理费收入的10%计提风险准备金。"
}
]
# 工具1:关键词搜索
# @tool 装饰器:把这个函数变成 langchain 工具。
# 函数名自动变成工具名:search_by_keywords
# 参数类型注解 keywords: str 告诉 langchain 这个工具接收什么输入
# 返回类型 -> str 告诉 langchain 这个工具返回什么
# docstring(三引号里的文字)自动变成工具的描述,AI 会读取这个描述
@tool
def search_by_keywords(keywords: str) -> str:
"""通过关键词搜索私募基金规则。输入关键词,用空格或逗号分隔。"""
keywords = keywords.strip().lower()
# 这行把用户输入的关键词按逗号、空格分割成列表。
keyword_list = re.split(r'[,,\s]+', keywords)
# 创建一个空列表,存匹配结果
matched_rules = []
for rule in FUND_RULES_DB:
rule_text = (rule["category"] + " " + rule["question"]).lower()
# 统计用户输入的关键词里,有多少个出现在这条规则中
match_count = sum(1 for kw in keyword_list if kw in rule_text)
if match_count > 0:
matched_rules.append((rule, match_count))
# 排序匹配结果,按匹配数量从高到低排。
# lambda x: x[1]:取元组的第二个元素(match_count)作为排序依据
# reverse=True:降序排列(数量大的在前)
matched_rules.sort(key=lambda x: x[1], reverse=True)
if not matched_rules:
return "未找到与关键词相关的规则。"
result = []
# 格式化输出结果。
# matched_rules[:2]:只取前2条(匹配度最高的)
# for rule, _ in ...:_ 表示忽略第二个元素(match_count),只取 rule
# f"...":f-string 格式化字符串
# "\n\n".join(result):用两个换行符拼接结果
for rule, _ in matched_rules[:2]:
result.append(f"类别: {rule['category']}\n问题: {rule['question']}\n答案: {rule['answer']}")
return "\n\n".join(result)
# 工具2:类别查询
@tool
def search_by_category(category: str) -> str:
"""根据类别查询私募基金规则。可选类别:设立与募集、监管规定"""
category = category.strip()
matched_rules = []
for rule in FUND_RULES_DB:
if category in rule["category"]:
matched_rules.append(rule)
if not matched_rules:
return f"未找到类别为 '{category}' 的规则。"
result = []
for rule in matched_rules:
result.append(f"问题: {rule['question']}\n答案: {rule['answer']}")
return "\n\n".join(result)
# 工具3:直接回答问题
@tool
def answer_question(query: str) -> str:
"""直接回答用户问题,尝试在知识库中匹配最相关的规则。"""
query = query.strip()
best_rule = None
best_score = 0
for rule in FUND_RULES_DB:
query_words = set(query.lower().split())
rule_words = set((rule["question"] + " " + rule["category"]).lower().split())
# 取两个集合的交集(共同出现的词)
common_words = query_words.intersection(rule_words)
# 匹配分数 = 共同词数 / 用户问题的词数
score = len(common_words) / max(1, len(query_words))
if score > best_score:
best_score = score
best_rule = rule
if best_score < 0.2 or best_rule is None:
return "这个问题超出了知识库范围,对不起,在我的知识库中没有关于这个主题的详细信息。"
return f"类别: {best_rule['category']}\n问题: {best_rule['question']}\n答案: {best_rule['answer']}"
# 创建 LLM
llm = ChatTongyi(
model="qwen-turbo",
dashscope_api_key=DASHSCOPE_API_KEY
)
# 绑定工具到 LLM
tools = [search_by_keywords, search_by_category, answer_question]
llm_with_tools = llm.bind_tools(tools)
# 系统提示词
SYSTEM_PROMPT = """你是私募基金问答助手。
你可以使用以下工具:
- search_by_keywords: 通过关键词搜索规则
- search_by_category: 按类别查询规则(可选类别:设立与募集、监管规定)
- answer_question: 直接尝试回答用户问题
如果知识库中没有相关信息,请明确告知用户。"""
# Agent 节点:调用 LLM 决定下一步
# 这是一个节点函数,是 StateGraph 中的一个处理步骤。
# 函数签名解释:
# 输入:state: AgentState —— 接收当前状态
# 输出:AgentState —— 返回更新后的状态
def agent_node(state: AgentState) -> AgentState:
"""Agent 思考节点:决定是调用工具还是直接回答"""
# 从状态中取出消息列表。
messages = state["messages"]
# 如果是第一轮,添加系统提示
# isinstance(m, SystemMessage):检查 m 是否是 SystemMessage 类型
# for m in messages:遍历消息列表
# any(...):只要有一个 True 就返回 True
# if not any(...):如果没有 SystemMessage,说明是第一轮
# 为什么要检查?
# 因为 langgraph 的节点会被多次调用:
# 第一轮:用户提问 → agent 思考
# 第二轮:工具返回结果 → agent 再次思考
# ...
# 每次 agent_node 都会收到完整的消息历史,但系统提示只需要添加一次。
if not any(isinstance(m, SystemMessage) for m in messages):
messages = [SystemMessage(content=SYSTEM_PROMPT)] + messages
# 调用 LLM
response = llm_with_tools.invoke(messages)
# 打印 AI 的思考
print("\n" + "=" * 50)
print("【AI 思考】")
if response.content:
print(f" 内容: {response.content}")
# 打印工具调用
if response.tool_calls:
print(" 决定调用工具:")
for tc in response.tool_calls:
print(f" - 工具名: {tc['name']}")
print(f" 参数: {tc['args']}")
print("=" * 50)
# 返回更新后的状态
return {"messages": [response]}
# 定义条件边:决定是否继续调用工具
def should_continue(state: AgentState) -> str:
"""判断是否需要继续调用工具"""
messages = state["messages"]
last_message = messages[-1]
# 如果最后一条消息包含 tool_calls,说明需要调用工具
if last_message.tool_calls:
return "tools"
# 否则结束
return END
# 创建自定义工具执行节点(带打印)
def custom_tool_node(state: AgentState) -> AgentState:
"""执行工具并打印结果"""
messages = state["messages"]
last_message = messages[-1]
tool_messages = []
for tool_call in last_message.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
# 执行工具
for tool in tools:
if tool.name == tool_name:
result = tool.invoke(tool_args)
# 打印工具调用
print("\n" + "-" * 50)
print(f"【工具调用】")
print(f" 工具: {tool_name}")
print(f" 参数: {tool_args}")
print(f" 结果: {result}")
print("-" * 50)
tool_messages.append(ToolMessage(
content=result,
tool_call_id=tool_call["id"]
))
break
return {"messages": tool_messages}
# 构建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", custom_tool_node) # 用自定义的替换 ToolNode
# 设置入口点
workflow.set_entry_point("agent")
# 添加边
# 添加条件边:从 agent 节点出发,根据 should_continue 的返回值决定下一步。
# 参数解释:
# "agent":边的起点
# should_continue:条件函数,返回 "tools" 或 END
# {...}:返回值到目标节点的映射
# "tools" → 走向 tools 节点
# END → 流程结束
workflow.add_conditional_edges(
"agent",
should_continue,
{
"tools": "tools",
END: END
}
)
workflow.add_edge("tools", "agent")
# 编译图
app = workflow.compile()
# 主循环
def main():
print("=== 私募基金运作指引问答助手 ===")
print("输入'退出'结束对话\n")
while True:
user_input = input("请输入您的问题:")
if user_input.lower() in ['退出', 'exit', 'quit']:
print("感谢使用,再见!")
break
# 运行图
result = app.invoke({
"messages": [HumanMessage(content=user_input)]
})
# 输出最终答案
final_message = result["messages"][-1]
print(f"\n回答: {final_message.content}\n")
print("-" * 40)
if __name__ == "__main__":
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
我用一个完整的例子,从用户输入到最终输出,一步步演示。
# 场景:用户问"合格投资者标准是什么?"
# 第1步:用户输入
user_input = "合格投资者标准是什么?"
程序创建初始状态:
state = {
"messages": [
HumanMessage(content="合格投资者标准是什么?")
]
}
2
3
4
5
# 第2步:进入 agent 节点
执行 agent_node 函数:
- 检查消息列表 → 没有 SystemMessage → 添加系统提示
messages = [
SystemMessage(content="你是私募基金问答助手..."),
HumanMessage(content="合格投资者标准是什么?")
]
2
3
4
- 调用 LLM
response = llm_with_tools.invoke(messages)
LLM 看到问题后,决定调用工具,返回:
AIMessage(
content="",
tool_calls=[
{
"name": "search_by_keywords",
"args": {"keywords": "合格投资者"},
"id": "call_abc123"
}
]
)
2
3
4
5
6
7
8
9
10
- 返回新状态
# agent_node 返回
return {"messages": [response]}
# langgraph 自动追加后,状态变成:
state = {
"messages": [
SystemMessage(...),
HumanMessage(content="合格投资者标准是什么?"),
AIMessage(content="", tool_calls=[...]) # 新增
]
}
2
3
4
5
6
7
8
9
10
11
# 第3步:执行条件边 should_continue
执行 should_continue 函数:
last_message = state["messages"][-1] # 刚才的 AIMessage
if last_message.tool_calls: # 有工具调用
return "tools" # 返回 "tools"
2
3
条件边根据返回值 "tools",把流程导向 tools 节点。
# 第4步:进入 tools 节点
执行 custom_tool_node 函数:
- 取出 tool_calls
tool_call = {
"name": "search_by_keywords",
"args": {"keywords": "合格投资者"},
"id": "call_abc123"
}
2
3
4
5
- 执行工具
result = search_by_keywords.invoke({"keywords": "合格投资者"})
# 返回:"类别: 设立与募集\n问题: 私募基金的合格投资者标准是什么?\n答案: 合格投资者是指..."
2
- 创建 ToolMessage
ToolMessage(
content="类别: 设立与募集\n问题: ...",
tool_call_id="call_abc123"
)
2
3
4
- 返回新状态
# custom_tool_node 返回
return {"messages": [ToolMessage(...)]}
# 状态变成:
state = {
"messages": [
SystemMessage(...),
HumanMessage("合格投资者标准是什么?"),
AIMessage(tool_calls=[...]),
ToolMessage(content="类别: 设立与募集...") # 新增
]
}
2
3
4
5
6
7
8
9
10
11
12
# 第5步:固定边 tools → agent
根据 workflow.add_edge("tools", "agent"),流程自动回到 agent 节点。
# 第6步:再次进入 agent 节点
再次执行 agent_node 函数:
检查消息列表 → 已经有 SystemMessage → 不添加
调用 LLM,此时消息列表包含:
- 系统提示
- 用户问题
- AI 的工具调用
- 工具返回的结果
LLM 看到工具结果后,决定直接回答:
AIMessage(content="根据查询结果,私募基金的合格投资者标准是:投资于单只私募基金的金额不低于100万元...")
- 返回新状态
state = {
"messages": [
SystemMessage(...),
HumanMessage("合格投资者标准是什么?"),
AIMessage(tool_calls=[...]),
ToolMessage(...),
AIMessage(content="根据查询结果...") # 新增
]
}
2
3
4
5
6
7
8
9
# 第7步:执行条件边 should_continue
last_message = state["messages"][-1] # AIMessage(content="根据查询结果...")
if last_message.tool_calls: # 空的,没有工具调用
return "tools"
else:
return END # 返回 END
2
3
4
5
条件边返回 END,流程结束。
# 第8步:输出结果
final_message = result["messages"][-1]
print(final_message.content)
# 输出:根据查询结果,私募基金的合格投资者标准是...
2
3
# 完整流程图
用户输入:"合格投资者标准是什么?"
│
▼
┌──────────────┐
│ agent 节点 │ ← 第一次进入
│ AI思考:调工具 │
└──────────────┘
│
tool_calls 不为空
│
▼
┌──────────────┐
│ tools 节点 │
│ 执行工具搜索 │
└──────────────┘
│
│ 固定边
▼
┌──────────────┐
│ agent 节点 │ ← 第二次进入
│ AI思考:直接答 │
└──────────────┘
│
tool_calls 为空
│
▼
【END】
│
▼
输出最终答案
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 总结:LangGraph 的核心
- 状态是核心:所有节点共享同一份状态,通过消息传递信息
- 节点是处理单元:每个节点接收状态、处理、返回更新
- 边控制流转:固定边或条件边决定流程走向
- 循环直到结束:agent → tools → agent → ... → END
# 深思熟虑型AI Agent
特点:基于内部模型进行规划,通过推理选择最优行动方案,具有长期目标导向性。

核心流程:
- 感知:获取环境信息
- 建模:更新内部世界状态表示
- 推理:生成候选计划并模拟结果
- 决策:选择最优方案执行
优势:
能处理多步复杂任务
优化长期目标而非即时反馈
适应动态变化环境
优势:
能处理多步复杂任务
优化长期目标而非即时反馈
适应动态变化环境
典型示例:
路径规划智能体:
- 生成多条候选路线
- 评估安全性/耗时等指标
- 选择最优路径执行
适用场景: 需战略规划的任务(如物流调度、投资决策等)
像下棋高手——每步棋都经过推演,而非凭直觉反应
# CASE:智能投研助手
为投资研究场景设计,基于LangGraph实现的深思熟虑型智能体。
该智能体能够整合市场数据,进行多步骤分析和推理,生成投资观点和研究报告。
与反应式智能体不同,深思熟虑型智能体通过内部建模、推理和规划,制定最优行动方案,特别适合
需要多步骤思考和长期目标优化的任务。
- 内部建模:构建市场模型和行业理解
- 多方案生成:基于市场模型创建多个候选投资策略
- 方案评估:对比分析各方案优劣,考虑多维度因素
- 长期规划:优化整体投资回报,而非单点决策
- 推理透明:能够解释决策过程和选择理由
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
深思熟虑智能体- 智能投研助手
基于LangGraph实现的深思熟虑型智能体,适用于投资研究场景
"""
import json
from typing import Dict, List, Any, Literal, TypedDict, Optional, Union, Tuple
from datetime import datetime
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatTongyi
from langchain_core.output_parsers import StrOutputParser
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END
# 设置API密钥
DASHSCOPE_API_KEY = 'sk-cd87191136be484d92f1ac53551440f1'
# 创建LLM实例
llm = ChatTongyi(
model="qwen-turbo",
dashscope_api_key=DASHSCOPE_API_KEY
)
# 定义输出模型
class PerceptionOutput(BaseModel):
"""感知阶段输出的市场数据和信息"""
market_overview: str = Field(..., description="市场概况和最新动态")
key_indicators: Dict[str, str] = Field(..., description="关键经济和市场指标")
recent_news: List[str] = Field(..., description="近期重要新闻")
industry_trends: Dict[str, str] = Field(..., description="行业趋势分析")
class ModelingOutput(BaseModel):
"""建模阶段输出的内部世界模型"""
market_state: str = Field(..., description="当前市场状态评估")
economic_cycle: str = Field(..., description="经济周期判断")
risk_factors: List[str] = Field(..., description="主要风险因素")
opportunity_areas: List[str] = Field(..., description="潜在机会领域")
market_sentiment: str = Field(..., description="市场情绪分析")
class ReasoningPlan(BaseModel):
"""推理阶段生成的候选分析方案"""
plan_id: str = Field(..., description="方案ID")
hypothesis: str = Field(..., description="投资假设")
analysis_approach: str = Field(..., description="分析方法")
expected_outcome: str = Field(..., description="预期结果")
confidence_level: float = Field(..., description="置信度(0-1)")
pros: List[str] = Field(..., description="方案优势")
cons: List[str] = Field(..., description="方案劣势")
class DecisionOutput(BaseModel):
"""决策阶段选择的最优投资观点"""
selected_plan_id: str = Field(..., description="选中的方案ID")
investment_thesis: str = Field(..., description="投资论点")
supporting_evidence: List[str] = Field(..., description="支持证据")
risk_assessment: str = Field(..., description="风险评估")
recommendation: str = Field(..., description="投资建议")
timeframe: str = Field(..., description="时间框架")
# 定义智能体状态
class ResearchAgentState(TypedDict):
"""研究智能体的状态"""
# 输入
research_topic: str # 研究主题
industry_focus: str # 行业焦点
time_horizon: str # 时间范围(短期/中期/长期)
# 处理状态
perception_data: Optional[Dict[str, Any]] # 感知阶段收集的数据
world_model: Optional[Dict[str, Any]] # 内部世界模型
reasoning_plans: Optional[List[Dict[str, Any]]] # 候选分析方案
selected_plan: Optional[Dict[str, Any]] # 选中的最优方案
# 输出
final_report: Optional[str] # 最终研究报告
# 控制流
current_phase: Literal["perception", "modeling", "reasoning", "decision", "report", "completed"]
error: Optional[str] # 错误信息
# 提示模板
PERCEPTION_PROMPT = """你是一个专业的投资研究分析师,请收集和整理关于以下研究主题的市场数据和信息:
研究主题: {research_topic}
行业焦点: {industry_focus}
时间范围: {time_horizon}
请从以下几个方面进行市场感知:
1. 市场概况和最新动态
2. 关键经济和市场指标
3. 近期重要新闻(至少3条)
4. 行业趋势分析(至少针对3个细分领域)
根据你的专业知识和经验,提供尽可能详细和准确的信息。
输出格式要求为JSON,包含以下字段:
- market_overview: 字符串
- key_indicators: 字典,键为指标名称,值为指标值和简要解释
- recent_news: 字符串列表,每项为一条重要新闻
- industry_trends: 字典,键为细分领域,值为趋势分析
"""
MODELING_PROMPT = """你是一个资深投资策略师,请根据以下市场数据和信息,构建市场内部模型,进行深度分析:
研究主题: {research_topic}
行业焦点: {industry_focus}
时间范围: {time_horizon}
市场数据和信息: {perception_data}
请构建一个全面的市场内部模型,包括:
1. 当前市场状态评估
2. 经济周期判断
3. 主要风险因素(至少3个)
4. 潜在机会领域(至少3个)
5. 市场情绪分析
输出格式要求为JSON,包含以下字段:
- market_state: 字符串
- economic_cycle: 字符串
- risk_factors: 字符串列表
- opportunity_areas: 字符串列表
- market_sentiment: 字符串
"""
REASONING_PROMPT = """你是一个战略投资顾问,请根据以下市场模型,生成3个不同的投资分析方案:
研究主题: {research_topic}
行业焦点: {industry_focus}
时间范围: {time_horizon}
市场内部模型: {world_model}
请为每个方案提供:
1. 方案ID(简短标识符)
2. 投资假设
3. 分析方法
4. 预期结果
5. 置信度(0-1之间的小数)
6. 方案优势(至少3点)
7. 方案劣势(至少2点)
这些方案应该有明显的差异,代表不同的投资思路或分析角度。
输出格式要求为JSON数组,每个元素包含以下字段:
- plan_id: 字符串
- hypothesis: 字符串
- analysis_approach: 字符串
- expected_outcome: 字符串
- confidence_level: 浮点数
- pros: 字符串列表
- cons: 字符串列表
"""
DECISION_PROMPT = """你是一个投资决策委员会主席,请评估以下候选分析方案,选择最优方案并形成投资决策:
研究主题: {research_topic}
行业焦点: {industry_focus}
时间范围: {time_horizon}
市场内部模型: {world_model}
候选分析方案: {reasoning_plans}
请基于方案的假设、分析方法、预期结果、置信度以及优缺点,选择最优的投资方案,并给出详细的决策理由。
你的决策应该综合考虑投资潜力、风险水平和时间框架的匹配度。
输出格式要求为JSON,包含以下字段:
- selected_plan_id: 字符串
- investment_thesis: 字符串
- supporting_evidence: 字符串列表
- risk_assessment: 字符串
- recommendation: 字符串
- timeframe: 字符串
"""
REPORT_PROMPT = """你是一个专业的投资研究报告撰写人,请根据以下信息生成一份完整的投资研究报告:
研究主题: {research_topic}
行业焦点: {industry_focus}
时间范围: {time_horizon}
市场数据和信息: {perception_data}
市场内部模型: {world_model}
选定的投资决策: {selected_plan}
请生成一份结构完整、逻辑清晰的投研报告,包括但不限于:
1. 报告标题和摘要
2. 市场和行业背景
3. 核心投资观点
4. 详细分析论证
5. 风险因素
6. 投资建议
7. 时间框架和预期回报
报告应当专业、客观,同时提供足够的分析深度和洞见。
"""
# 第一阶段:感知 - 收集市场数据和信息
def perception(state: ResearchAgentState) -> Dict[str, Any]:
"""感知阶段:收集和整理市场数据和信息"""
print("1. 感知阶段:收集市场数据和信息...")
try:
# 准备提示
prompt = ChatPromptTemplate.from_template(PERCEPTION_PROMPT)
# 构建输入
input_data = {
"research_topic": state["research_topic"],
"industry_focus": state["industry_focus"],
"time_horizon": state["time_horizon"]
}
# 调用LLM
chain = prompt | llm | StrOutputParser()
result_str = chain.invoke(input_data)
# 解析JSON
try:
result = json.loads(result_str)
except json.JSONDecodeError:
# 尝试提取JSON部分
import re
json_match = re.search(r'\{[\s\S]*\}', result_str)
if json_match:
result = json.loads(json_match.group())
else:
raise ValueError("无法解析LLM返回的JSON")
# 更新状态
return {
"perception_data": result,
"current_phase": "modeling"
}
except Exception as e:
return {
"error": f"感知阶段出错: {str(e)}",
"current_phase": "perception"
}
# 第二阶段:建模 - 构建内部世界模型
def modeling(state: ResearchAgentState) -> Dict[str, Any]:
"""建模阶段:构建内部世界模型,理解市场状态"""
print("2. 建模阶段:构建内部世界模型...")
try:
# 确保感知数据已存在
if not state.get("perception_data"):
return {
"error": "建模阶段缺少感知数据",
"current_phase": "perception"
}
# 准备提示
prompt = ChatPromptTemplate.from_template(MODELING_PROMPT)
# 构建输入
input_data = {
"research_topic": state["research_topic"],
"industry_focus": state["industry_focus"],
"time_horizon": state["time_horizon"],
"perception_data": json.dumps(state["perception_data"], ensure_ascii=False, indent=2)
}
# 调用LLM
chain = prompt | llm | StrOutputParser()
result_str = chain.invoke(input_data)
# 解析JSON
try:
result = json.loads(result_str)
except json.JSONDecodeError:
import re
json_match = re.search(r'\{[\s\S]*\}', result_str)
if json_match:
result = json.loads(json_match.group())
else:
raise ValueError("无法解析LLM返回的JSON")
# 更新状态
return {
"world_model": result,
"current_phase": "reasoning"
}
except Exception as e:
return {
"error": f"建模阶段出错: {str(e)}",
"current_phase": "modeling"
}
# 第三阶段:推理 - 生成候选分析方案
def reasoning(state: ResearchAgentState) -> Dict[str, Any]:
"""推理阶段:生成多个候选分析方案并模拟结果"""
print("3. 推理阶段:生成候选分析方案...")
try:
# 确保世界模型已存在
if not state.get("world_model"):
return {
"error": "推理阶段缺少世界模型",
"current_phase": "modeling"
}
# 准备提示
prompt = ChatPromptTemplate.from_template(REASONING_PROMPT)
# 构建输入
input_data = {
"research_topic": state["research_topic"],
"industry_focus": state["industry_focus"],
"time_horizon": state["time_horizon"],
"world_model": json.dumps(state["world_model"], ensure_ascii=False, indent=2)
}
# 调用LLM
chain = prompt | llm | StrOutputParser()
result_str = chain.invoke(input_data)
# 解析JSON
try:
result = json.loads(result_str)
except json.JSONDecodeError:
import re
json_match = re.search(r'\[[\s\S]*\]', result_str)
if json_match:
result = json.loads(json_match.group())
else:
raise ValueError("无法解析LLM返回的JSON数组")
# 更新状态
return {
"reasoning_plans": result,
"current_phase": "decision"
}
except Exception as e:
return {
"error": f"推理阶段出错: {str(e)}",
"current_phase": "reasoning"
}
# 第四阶段:决策 - 选择最优方案
def decision(state: ResearchAgentState) -> Dict[str, Any]:
"""决策阶段:评估候选方案并选择最优投资观点"""
print("4. 决策阶段:选择最优投资观点...")
try:
# 确保候选方案已存在
if not state.get("reasoning_plans"):
return {
"error": "决策阶段缺少候选方案",
"current_phase": "reasoning"
}
# 准备提示
prompt = ChatPromptTemplate.from_template(DECISION_PROMPT)
# 构建输入
input_data = {
"research_topic": state["research_topic"],
"industry_focus": state["industry_focus"],
"time_horizon": state["time_horizon"],
"world_model": json.dumps(state["world_model"], ensure_ascii=False, indent=2),
"reasoning_plans": json.dumps(state["reasoning_plans"], ensure_ascii=False, indent=2)
}
# 调用LLM
chain = prompt | llm | StrOutputParser()
result_str = chain.invoke(input_data)
# 解析JSON
try:
result = json.loads(result_str)
except json.JSONDecodeError:
import re
json_match = re.search(r'\{[\s\S]*\}', result_str)
if json_match:
result = json.loads(json_match.group())
else:
raise ValueError("无法解析LLM返回的JSON")
# 更新状态
return {
"selected_plan": result,
"current_phase": "report"
}
except Exception as e:
return {
"error": f"决策阶段出错: {str(e)}",
"current_phase": "decision"
}
# 第五阶段:报告 - 生成完整研究报告
def report_generation(state: ResearchAgentState) -> Dict[str, Any]:
"""报告阶段:生成完整的投资研究报告"""
print("5. 报告阶段:生成完整研究报告...")
try:
# 确保选定方案已存在
if not state.get("selected_plan"):
return {
"error": "报告阶段缺少选定方案",
"current_phase": "decision"
}
# 准备提示
prompt = ChatPromptTemplate.from_template(REPORT_PROMPT)
# 构建输入
input_data = {
"research_topic": state["research_topic"],
"industry_focus": state["industry_focus"],
"time_horizon": state["time_horizon"],
"perception_data": json.dumps(state["perception_data"], ensure_ascii=False, indent=2),
"world_model": json.dumps(state["world_model"], ensure_ascii=False, indent=2),
"selected_plan": json.dumps(state["selected_plan"], ensure_ascii=False, indent=2)
}
# 调用LLM
chain = prompt | llm | StrOutputParser()
result = chain.invoke(input_data)
# 更新状态
return {
"final_report": result,
"current_phase": "completed"
}
except Exception as e:
return {
"error": f"报告生成阶段出错: {str(e)}",
"current_phase": "report"
}
# 创建智能体工作流图
def create_research_agent_workflow() -> StateGraph:
"""创建深思熟虑型研究智能体工作流图"""
# 创建状态图
workflow = StateGraph(ResearchAgentState)
# 添加节点
workflow.add_node("perception", perception)
workflow.add_node("modeling", modeling)
workflow.add_node("reasoning", reasoning)
workflow.add_node("decision", decision)
workflow.add_node("report", report_generation)
# 设置入口点
workflow.set_entry_point("perception")
# 设置边(线性流程)
workflow.add_edge("perception", "modeling")
workflow.add_edge("modeling", "reasoning")
workflow.add_edge("reasoning", "decision")
workflow.add_edge("decision", "report")
workflow.add_edge("report", END)
# 编译工作流
return workflow.compile()
# 测试函数
def run_research_agent(topic: str, industry: str, horizon: str) -> Dict[str, Any]:
"""运行研究智能体并返回结果"""
# 创建工作流
agent = create_research_agent_workflow()
# 准备初始状态
initial_state = {
"research_topic": topic,
"industry_focus": industry,
"time_horizon": horizon,
"perception_data": None,
"world_model": None,
"reasoning_plans": None,
"selected_plan": None,
"final_report": None,
"current_phase": "perception",
"error": None
}
print("LangGraph Mermaid流程图:")
print(agent.get_graph().draw_mermaid())
# 运行智能体
result = agent.invoke(initial_state)
return result
# 主函数
if __name__ == "__main__":
print("=== 深思熟虑智能体 - 智能投研助手 ===\n")
print("使用模型:qwen-turbo\n")
# 用户输入
topic = input("请输入研究主题 (例如: 新能源汽车行业投资机会): ")
industry = input("请输入行业焦点 (例如: 电动汽车制造、电池技术): ")
horizon = input("请输入时间范围 [短期/中期/长期]: ")
print("\n智能投研助手开始工作...\n")
try:
# 运行智能体
result = run_research_agent(topic, industry, horizon)
# 处理结果
if result.get("error"):
print(f"\n发生错误: {result['error']}")
else:
print("\n=== 最终研究报告 ===\n")
print(result.get("final_report", "未生成报告"))
# 保存报告
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"research_report_{timestamp}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(result.get("final_report", "未生成报告"))
print(f"\n报告已保存为: {filename}")
except Exception as e:
print(f"\n运行过程中发生错误: {str(e)}")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
好,我来帮你把这个代码彻底拆解明白。既然你不会 Python,咱们从最基础的概念开始。
# 一、代码在做什么?(先有全局认识)
这段代码构建了一个投研助手,它像一个研究分析师的工作流:
感知 → 建模 → 推理 → 决策 → 生成报告
比如你问"新能源汽车投资机会",它会:
- 感知:收集市场数据、新闻、行业趋势
- 建模:分析当前市场状态、经济周期、风险因素
- 推理:生成多个投资方案(比如看多、看空、中性)
- 决策:选一个最优方案
- 报告:写成完整的研究报告
LangGraph 的作用:把这个流程变成一张可编程的流程图,每个阶段是一个节点,节点之间用边连接。
# 二、Python 基础知识清单
先过一遍代码里用到的 Python 语法,我会举一反三:
# 1. import 导入库
import os
import json
from typing import Dict, List, Any
2
3
类比:就像你要做菜,需要先从厨房拿工具。
import os:拿操作系统工具(读文件、环境变量等)import json:拿 JSON 解析工具(处理 JSON 格式数据)from typing import Dict:从 typing 工具箱里只拿 Dict 这个工具
举一反三:
import datetime # 拿日期时间工具
from datetime import datetime # 只拿 datetime 这个具体工具
2
# 2. 类型注解(Python 3.5+ 的特性)
from typing import Dict, List, Any, Literal, TypedDict, Optional
def perception(state: ResearchAgentState) -> Dict[str, Any]:
2
3
含义:
state: ResearchAgentState:参数 state 的类型是 ResearchAgentState-> Dict[str, Any]:返回值的类型是字典(键是字符串,值可以是任何类型)
类比:就像在菜单上标注"这道菜是辣的"——方便别人理解,但 Python 运行时不会强制检查。
举一反三:
def add(a: int, b: int) -> int:
return a + b
name: str = "张三" # 变量也可以加类型注解
age: int = 25
2
3
4
5
# 3. 类和继承
from pydantic import BaseModel, Field
class PerceptionOutput(BaseModel):
market_overview: str = Field(..., description="市场概况和最新动态")
key_indicators: Dict[str, str] = Field(..., description="关键经济和市场指标")
2
3
4
5
含义:
class PerceptionOutput(BaseModel):定义一个类,继承自 BaseModelBaseModel:Pydantic 提供的基类,可以自动做数据验证market_overview: str:这个类有一个属性叫 market_overview,类型是字符串Field(..., description="..."):设置字段属性,...表示必填
类比:
# 不用 Pydantic 的普通类
class Dog:
def __init__(self, name, age):
self.name = name
self.age = age
# 使用 Pydantic 的类(自动验证数据)
from pydantic import BaseModel
class Dog(BaseModel):
name: str
age: int
# 自动验证:age 传字符串会报错
dog = Dog(name="旺财", age="三岁") # 报错!age 必须是 int
2
3
4
5
6
7
8
9
10
11
12
13
14
15
举一反三:
class User(BaseModel):
name: str = Field(..., description="用户名")
age: int = Field(default=18, description="年龄,默认18") # 有默认值,非必填
user = User(name="张三") # age 自动填 18
2
3
4
5
# 4. TypedDict(类型安全的字典)
from typing import TypedDict, Optional, Literal
class ResearchAgentState(TypedDict):
research_topic: str
perception_data: Optional[Dict[str, Any]]
current_phase: Literal["perception", "modeling", "reasoning", "decision", "report", "completed"]
2
3
4
5
6
含义:
TypedDict:定义一个字典结构,说明每个键的类型Optional[...]:可以是这个类型,也可以是 NoneLiteral["a", "b"]:只能是这几个字符串中的一个
类比:
# 普通字典,没有类型约束
state = {
"research_topic": "新能源",
"perception_data": None,
"current_phase": "unknown" # 打错了也不会报错
}
# TypedDict 字典,有类型约束(静态检查工具会发现错误)
class State(TypedDict):
phase: Literal["start", "end"]
state: State = {"phase": "middle"} # 静态检查会报错
2
3
4
5
6
7
8
9
10
11
12
# 5. 函数定义和返回值
def perception(state: ResearchAgentState) -> Dict[str, Any]:
"""感知阶段:收集和整理市场数据和信息"""
# ... 省略代码 ...
return {
"perception_data": result,
"current_phase": "modeling"
}
2
3
4
5
6
7
含义:
def perception(...):定义一个函数"""感知阶段...""":文档字符串,说明函数作用return {...}:返回一个字典
举一反三:
# 无参数无返回值
def say_hello():
print("你好")
# 有参数有返回值
def add(a: int, b: int) -> int:
return a + b
# 返回多个值(实际是元组)
def get_info() -> tuple[str, int]:
return "张三", 25
name, age = get_info() # 解构赋值
2
3
4
5
6
7
8
9
10
11
12
13
# 三、LangChain 核心概念
现在进入 LangChain 部分,这是代码的核心。
# 1. ChatPromptTemplate(提示词模板)
from langchain_core.prompts import ChatPromptTemplate
PERCEPTION_PROMPT = """你是一个专业的投资研究分析师...
研究主题: {research_topic}
行业焦点: {industry_focus}
"""
prompt = ChatPromptTemplate.from_template(PERCEPTION_PROMPT)
2
3
4
5
6
7
8
含义:
- 定义一个带占位符的提示词模板
{research_topic}、{industry_focus}是变量ChatPromptTemplate.from_template()把字符串转成模板对象
类比:
# 就像填空题
template = "你好,{name},你今年{age}岁。"
# 填空
result = template.format(name="张三", age=25)
print(result) # 你好,张三,你今年25岁。
2
3
4
5
6
举一反三:
# LangChain 的做法
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("翻译成英文:{text}")
formatted = prompt.format(text="你好世界")
print(formatted) # 翻译成英文:你好世界
2
3
4
5
6
# 2. ChatTongyi(通义千问模型)
from langchain_community.chat_models import ChatTongyi
llm = ChatTongyi(
model="qwen-turbo",
dashscope_api_key=DASHSCOPE_API_KEY
)
2
3
4
5
6
含义:
- 创建一个通义千问的聊天模型实例
model="qwen-turbo":指定使用哪个模型dashscope_api_key:API 密钥
类比:
# 就像创建一个 AI 客服
ai = ChatTongyi(model="qwen-turbo")
# 向 AI 提问
response = ai.invoke("你好")
print(response.content) # AI 的回复
2
3
4
5
6
举一反三(换其他模型):
# OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
# 本地模型(Ollama)
from langchain_community.chat_models import ChatOllama
llm = ChatOllama(model="llama2")
2
3
4
5
6
7
# 3. StrOutputParser(字符串输出解析器)
from langchain_core.output_parsers import StrOutputParser
parser = StrOutputParser()
2
3
含义:
- LLM 返回的是一个复杂的对象(包含 content、metadata 等)
StrOutputParser只提取其中的文本内容
类比:
# LLM 返回的结构
response = AIMessage(
content="你好",
response_metadata={"model": "qwen-turbo"},
id="xxx"
)
# 用解析器只提取 content
parser = StrOutputParser()
text = parser.invoke(response) # "你好"
2
3
4
5
6
7
8
9
10
# 4. Chain(链式组合)
chain = prompt | llm | StrOutputParser()
result_str = chain.invoke(input_data)
2
含义:
|操作符把三个组件串联起来- 数据流:
input_data → prompt → llm → parser → result_str - 相当于:
parser.invoke(llm.invoke(prompt.invoke(input_data)))
类比:
# 就像工厂流水线
原材料(input_data)
→ 加工站1(prompt:格式化)
→ 加工站2(llm:AI处理)
→ 加工站3(parser:提取结果)
→ 成品(result_str)
2
3
4
5
6
举一反三:
# 不用 | 的写法
formatted = prompt.invoke(input_data)
response = llm.invoke(formatted)
result = parser.invoke(response)
# 用 | 的写法(推荐)
chain = prompt | llm | parser
result = chain.invoke(input_data)
2
3
4
5
6
7
8
# 四、LangGraph 核心概念
这是这段代码最核心的部分。
# 1. StateGraph(状态图)
from langgraph.graph import StateGraph, END
workflow = StateGraph(ResearchAgentState)
2
3
含义:
StateGraph:状态图,用于构建工作流ResearchAgentState:定义状态的数据结构END:工作流的终点
类比:
- 就像画一个流程图,每个节点是一个函数,边是连线
- State 是贯穿整个流程的数据载体,每个节点可以读取和修改它
举一反三:
from langgraph.graph import StateGraph, END
from typing import TypedDict
# 定义状态
class MyState(TypedDict):
counter: int
message: str
# 创建状态图
graph = StateGraph(MyState)
# 添加节点和边...
2
3
4
5
6
7
8
9
10
11
12
# 2. 节点和边
# 添加节点
workflow.add_node("perception", perception)
workflow.add_node("modeling", modeling)
# 设置入口点
workflow.set_entry_point("perception")
# 设置边
workflow.add_edge("perception", "modeling")
workflow.add_edge("modeling", "reasoning")
workflow.add_edge("reasoning", "decision")
workflow.add_edge("decision", "report")
workflow.add_edge("report", END)
2
3
4
5
6
7
8
9
10
11
12
13
含义:
add_node("节点名", 函数):添加一个节点,节点执行这个函数set_entry_point("节点名"):设置从哪个节点开始add_edge("A", "B"):A 执行完后执行 BEND:流程结束
类比:
入口 → perception → modeling → reasoning → decision → report → END
举一反三(条件分支):
def should_continue(state):
if state["counter"] < 3:
return "loop"
return "end"
workflow.add_conditional_edges(
"check",
should_continue,
{"loop": "process", "end": END}
)
2
3
4
5
6
7
8
9
10
# 3. 编译和运行
# 编译工作流
agent = workflow.compile()
# 运行
result = agent.invoke(initial_state)
2
3
4
5
含义:
compile():把工作流图编译成可执行的对象invoke():运行工作流,传入初始状态
类比:
compile():把流程图翻译成机器能执行的代码invoke():启动机器,输入初始数据
# 五、代码逐块解读
现在我把代码分成几个块,逐块解释:
# 块1:导入和配置
import os
import json
from typing import Dict, List, Any, Literal, TypedDict, Optional
from datetime import datetime
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatTongyi
from langchain_core.output_parsers import StrOutputParser
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END
DASHSCOPE_API_KEY = 'sk-xxx'
llm = ChatTongyi(model="qwen-turbo", dashscope_api_key=DASHSCOPE_API_KEY)
2
3
4
5
6
7
8
9
10
11
12
13
解读:
- 导入各种工具库
- 设置通义千问的 API 密钥
- 创建 LLM 实例(相当于创建一个 AI 对话对象)
# 块2:定义输出模型(Pydantic)
class PerceptionOutput(BaseModel):
market_overview: str = Field(..., description="市场概况")
key_indicators: Dict[str, str] = Field(..., description="关键指标")
recent_news: List[str] = Field(..., description="近期新闻")
industry_trends: Dict[str, str] = Field(..., description="行业趋势")
2
3
4
5
解读:
- 定义感知阶段输出数据的结构
- Pydantic 会自动验证数据是否符合这个结构
- 但代码里实际没有用这些类,只是定义了结构参考
# 块3:定义状态
class ResearchAgentState(TypedDict):
research_topic: str
industry_focus: str
time_horizon: str
perception_data: Optional[Dict[str, Any]]
world_model: Optional[Dict[str, Any]]
reasoning_plans: Optional[List[Dict[str, Any]]]
selected_plan: Optional[Dict[str, Any]]
final_report: Optional[str]
current_phase: Literal["perception", "modeling", "reasoning", "decision", "report", "completed"]
error: Optional[str]
2
3
4
5
6
7
8
9
10
11
解读:
- 定义整个工作流的数据结构
- 每个阶段会更新对应字段
current_phase记录当前在哪个阶段
# 块4:提示词模板
PERCEPTION_PROMPT = """你是一个专业的投资研究分析师...
研究主题: {research_topic}
行业焦点: {industry_focus}
...
"""
2
3
4
5
解读:
- 定义每个阶段的提示词
{变量名}是占位符,运行时替换
# 块5:节点函数(以 perception 为例)
def perception(state: ResearchAgentState) -> Dict[str, Any]:
"""感知阶段:收集市场数据和信息"""
print("1. 感知阶段...")
# 创建提示词模板
prompt = ChatPromptTemplate.from_template(PERCEPTION_PROMPT)
# 准备输入数据
input_data = {
"research_topic": state["research_topic"],
"industry_focus": state["industry_focus"],
"time_horizon": state["time_horizon"]
}
# 构建 chain 并调用
chain = prompt | llm | StrOutputParser()
result_str = chain.invoke(input_data)
# 解析 JSON
result = json.loads(result_str)
# 返回状态更新
return {
"perception_data": result,
"current_phase": "modeling"
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
解读:
- 从 state 里读取输入数据
- 构建 chain:prompt → llm → parser
- 调用 chain 得到 AI 的回复
- 解析 JSON 格式的回复
- 返回要更新的状态字段
其他节点类似,只是提示词和返回字段不同。
# 块6:创建工作流
def create_research_agent_workflow() -> StateGraph:
workflow = StateGraph(ResearchAgentState)
# 添加节点
workflow.add_node("perception", perception)
workflow.add_node("modeling", modeling)
workflow.add_node("reasoning", reasoning)
workflow.add_node("decision", decision)
workflow.add_node("report", report_generation)
# 设置入口
workflow.set_entry_point("perception")
# 添加边
workflow.add_edge("perception", "modeling")
workflow.add_edge("modeling", "reasoning")
workflow.add_edge("reasoning", "decision")
workflow.add_edge("decision", "report")
workflow.add_edge("report", END)
return workflow.compile()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
解读:
- 创建一个线性流程图
- 节点按顺序执行:感知 → 建模 → 推理 → 决策 → 报告
# 块7:运行工作流
def run_research_agent(topic, industry, horizon):
agent = create_research_agent_workflow()
initial_state = {
"research_topic": topic,
"industry_focus": industry,
"time_horizon": horizon,
"perception_data": None,
"world_model": None,
# ... 其他字段初始化为 None
}
result = agent.invoke(initial_state)
return result
2
3
4
5
6
7
8
9
10
11
12
13
14
解读:
- 创建工作流实例
- 准备初始状态
- 运行工作流,得到最终状态
# 六、核心知识点总结
# Python 知识点:
import导入库- 类型注解(
typing模块) - 类和继承(
class) - Pydantic 数据模型
- 函数定义和返回值
- 字典和列表
# LangChain 知识点:
ChatPromptTemplate:提示词模板ChatTongyi(或其他 LLM):AI 模型StrOutputParser:输出解析器|链式组合
# LangGraph 知识点:
StateGraph:状态图add_node:添加节点add_edge:添加边compile():编译invoke():运行
# 七、举一反三:如何修改这个代码?
# 场景1:换一个模型
# 换成 OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", openai_api_key="your-key")
2
3
4
# 场景2:添加一个新节点
# 假设要添加一个"风险分析"节点
def risk_analysis(state):
prompt = ChatPromptTemplate.from_template(RISK_PROMPT)
chain = prompt | llm | StrOutputParser()
result = chain.invoke(state)
return {"risk_report": result, "current_phase": "report"}
# 在工作流中添加
workflow.add_node("risk", risk_analysis)
workflow.add_edge("decision", "risk")
workflow.add_edge("risk", "report")
2
3
4
5
6
7
8
9
10
11
# 场景3:添加条件分支
def should_generate_report(state):
if state.get("confidence", 0) > 0.7:
return "report"
return "retry"
workflow.add_conditional_edges(
"decision",
should_generate_report,
{"report": "report", "retry": "reasoning"}
)
2
3
4
5
6
7
8
9
10
# 智能体分类(混合Hybrid)
特点:结合反应式的"快速本能"和深思熟虑的"战略规划",实现智能与效率的平衡。

三层设计:
- 底层(反应式):即时处理紧急任务(如避障)
- 中层(协调):管理任务优先级(可选)
- 顶层(深思熟虑):进行长期目标规划(如路径优化)
运作机制:
通过仲裁系统(如监督器)动态切换模式:
- 紧急情况→启用反应式快速响应
- 常规情况→启动深思熟虑规划
典型示例:
自动驾驶车辆:
- 突发障碍→立即刹车(反应式)
- 正常行驶→规划最优路线(深思熟虑)
核心优势:
- 兼具实时响应能力(毫秒级)
- 保留战略规划优势(长期目标)
# CASE:投顾AI助手
基于混合智能体架构设计,结合了反应式架构的即时响应能力和深思熟虑架构的长期规划能力,通过协调层动态选择最合适的处理模式,为客户提供智能化、个性化的财富管理咨询服务。
混合智能体采用三层架构设计:
- 底层(反应式层):
- 处理简单直接的查询,如市场状况、账户信息等
- 毫秒级响应速度,提供快速反馈
- 基于预设规则和简单逻辑作出决策
- 中层(协调层):
- 评估任务类型和优先级
- 动态选择处理模式(反应式或深思熟虑)
- 管理系统资源分配
- 顶层(深思熟虑层):
- 处理复杂的投资分析和长期财务规划
- 多步骤、深度思考过程
- 构建内部模型并生成多个备选方案
# 构建Agent的核心思想
- 不要为所有任务构建Agent
适用场景:Agent适合处理复杂、模糊且高价值的任务,而非所有场景。
判断标准:
- 任务复杂性:若决策树可明确规划,直接构建工作流更高效。
- 任务价值:高成本(如大量Token消耗)需由高回报任务承担。
- 关键能力验证:确保Agent能处理核心子任务(如代码生成、调试)。
- 错误成本:高风险的错误需通过限制权限或人工介入来缓解。
案例:代码生成是理想场景,因其复杂性高、价值大且输出易验证(如通过单元测试)。
- 保持简洁
Agent的核心组件:
- 环境:Agent的操作系统。
- 工具集:提供行动接口和反馈机制。
- 系统提示:定义目标、约束和预期行为。
设计原则:
- 初期避免过度复杂化,优先迭代核心组件。
- 优化(如成本、延迟)可在基础行为稳定后进行。
案例:不同功能的Agent可共享相同代码框架,仅调整工具和提示。
- 像Agent一样思考 理解Agent的局限性:
- Agent仅基于有限上下文(10-20k Token)做决策。
- 需模拟Agent的视角(如仅通过静态截图操作电脑)以发现设计缺陷。
改进方法:
- 直接询问模型(如Claude)以验证指令清晰度或工具使用合理性。
- 分析轨迹日志,优化上下文提供方式(如分辨率信息、操作建议)。