RAG的高级技巧

更新时间: 2026-03-13 14:53:27

# RAG高效召回方法

  • 都有哪些RAG召回的策略,提升召回的质量?

# 改进检索算法

知识图谱:利用知识图谱中的语义信息和实体关系,增强对查询和文档的理解,提升召回的相关性

# 引入重排序(Reranking)

重排序模型:对召回结果进行重排,提升问题和文档的相关性。常见的重排序模型有BGE-Rerank和CohereRerank。

场景:用户查询“如何提高深度学习模型的训练效率?”
召回结果:初步召回10篇文档,其中包含与“深度学习”、“训练效率”相关的文章。
重排序:BGE-Rerank对召回的10篇文档进行重新排序,将与“训练效率”最相关的文档(如“优化深度学习训练的技巧”)排在最前面,而将相关性较低的文档(如“深度学习基础理论”)排在后面。

混合检索:结合向量检索和关键词检索的优势,通过重排序模型对结果进行归一化处理,提升召回质量

什么是重排序Rerank?

重排序Rerank主要用于优化初步检索结果的排序,提高最终输出的相关性或准确性。BGE-Rerank和CohereRerank是两种广泛使用的重排序模型,它们在检索增强生成(RAG)系统、搜索引擎优化和问答系统中表现优异。

  • BGE-Rerank 由北京智源人工智能研究院(BAAI)开源发布,属于FlagEmbedding项目的一部分。
    基于Transformer的Cross-Encoder结构,直接计算查询(Query)与文档(Document)的交互相关性得分。
    训练数据:支持多语言(中、英等),训练数据包括T2Ranking、MSMARCO、NLI等数据集。
    提供bge-reranker-base和bge-reranker-large两个版本,后者在精度上更优。
    部署方式:可本地部署
    开源免费,适合本地化部署,保护数据隐私。
    在中文任务中表现优秀,适用于垂直领域优化

    1. CohereRerank 由Cohere公司提供的商业API服务。
      基于专有的深度学习模型,支持多语言(如rerank-multilingual-v3.0)。
      训练数据:优化了语义匹配,特别适用于混合检索(如结合BM25和向量检索)后的结果优化。
      使用方式:通过API调用,集成到LangChain、LlamaIndex等框架中。
      优势:
      •简单易用,适合快速集成到现有系统。
      •在英文和多语言任务中表现优异,如提升Hit Rate(命中率)和MRR(平均倒数排名)。
特性 BGE-Rerank Cohere Rerank
开源/商业 开源 商业API
部署方式 可本地部署 云端调用
多语言支持 中英优化 多语言(v3.0)
适用场景 数据敏感、垂直领域 快速集成、多语言优化

# 优化查询扩展

相似语义改写:使用大模型将用户查询改写成多个语义相近的查询,提升召回多样性。

from langchain.retrievers import MultiQueryRetriever
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.llms import Tongyi

import logging

# 设置 LangChain 相关日志级别为 DEBUG
logging.getLogger("langchain.retrievers").setLevel(logging.DEBUG)

# 或启用所有 LangChain 调试信息
logging.basicConfig(level=logging.DEBUG)

# 获取环境变量中的 DASHSCOPE_API_KEY
DASHSCOPE_API_KEY = 'your api key'

llm = Tongyi(model_name="qwen-turbo", dashscope_api_key=DASHSCOPE_API_KEY) # qwen-turbo

# 创建嵌入模型
embeddings = DashScopeEmbeddings(
    model="text-embedding-v1",
    dashscope_api_key=DASHSCOPE_API_KEY,
)

# 加载向量数据库,添加allow_dangerous_deserialization=True参数以允许反序列化
vectorstore = FAISS.load_local("./vector_db", embeddings)

# 创建MultiQueryRetriever
"""
工作原理:
接收用户 query(如“客户经理的考核标准是什么?”)
使用 llm 生成 3-5 个语义变体(如“客户经理绩效考核指标”、“客户经理评估标准”、“客户经理 KPI 要求”)
所有变体 + 原始 query 并行检索
合并去重后返回最相关文档  
"""
retriever = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(),
    llm=llm
)

# 示例查询
query = "客户经理的考核标准是什么?"
# 执行查询
results = retriever.invoke(query)

# 打印结果
print(f"查询: {query}")
print(f"找到 {len(results)} 个相关文档:")
for i, doc in enumerate(results):
    print(f"\n文档 {i+1}:")
    print(doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# Query改写

  • 为什么需要Query改写?
    RAG的核心在于“检索-生成”。如果第一步“检索”就走偏了,那么后续的“生成”质量也会降低。
    用户提出的问题往往是口语化的、承接上下文的、模糊的,甚至是包含了情绪的。
    而知识库里的文本(切片/Chunks)通常是陈述性的、客观的。
    =>需要一个翻译官的角色,将用户的“口语化查询”转换成“书面化、精确的检索语句”

  • 如何针对不同类型的Query进行改写?
    通过精心设计的Prompt来引导LLM完成这项任务。

RAG的核心在于“检索-生成”。如何通过Query改写让你的RAG检索更强大

  • 上下文依赖型
  • 对比型
  • 模糊指代型
  • 多意图型
  • 反问型
    你也可以通过意图识别,让AI自动分析是哪种类型,并进行改写
import dashscope
import os
import json

# 从环境变量中获取 API Key
dashscope.api_key = "you api key"

# 基于 prompt 生成文本
def get_completion(prompt, model="qwen-turbo-latest"):
    messages = [{"role": "user", "content": prompt}]
    response = dashscope.Generation.call(
        model=model,
        messages=messages,
        result_format='message',
        temperature=0,
    )
    return response.output.choices[0].message.content


# Query改写功能
class QueryRewriter:
    def __init__(self, model="qwen-turbo-latest"):
        self.model = model

    def rewrite_context_dependent_query(self, current_query, conversation_history):
        """上下文依赖型Query改写"""
        instruction = """
            你是一个智能的查询优化助手。请分析用户的当前问题以及前序对话历史,判断当前问题是否依赖于上下文。
            如果依赖,请将当前问题改写成一个独立的、包含所有必要上下文信息的完整问题。
            如果不依赖,直接返回原问题。
        """

        prompt = f"""
            ### 指令 ###
            {instruction}
            
            ### 对话历史 ###
            {conversation_history}
            
            ### 当前问题 ###
            {current_query}
            
            ### 改写后的问题 ###
        """

        return get_completion(prompt, self.model)

    def rewrite_comparative_query(self, query, context_info):
        """对比型Query改写"""
        instruction = """
            你是一个查询分析专家。请分析用户的输入和相关的对话上下文,识别出问题中需要进行比较的多个对象。
            然后,将原始问题改写成一个更明确、更适合在知识库中检索的对比性查询。
        """

        prompt = f"""
            ### 指令 ###
            {instruction}
        
            ### 对话历史/上下文信息 ###
            {context_info}
        
            ### 原始问题 ###
            {query}
        
            ### 改写后的查询 ###
        """

        return get_completion(prompt, self.model)

    def rewrite_ambiguous_reference_query(self, current_query, conversation_history):
        """模糊指代型Query改写"""
        instruction = """
            你是一个消除语言歧义的专家。请分析用户的当前问题和对话历史,找出问题中 "都"、"它"、"这个" 等模糊指代词具体指向的对象。
            然后,将这些指代词替换为明确的对象名称,生成一个清晰、无歧义的新问题。
        """

        prompt = f"""
            ### 指令 ###
            {instruction}
        
            ### 对话历史 ###
            {conversation_history}
        
            ### 当前问题 ###
            {current_query}
        
            ### 改写后的问题 ###
        """

        return get_completion(prompt, self.model)

    def rewrite_multi_intent_query(self, query):
        """多意图型Query改写 - 分解查询"""
        instruction = """
            你是一个任务分解机器人。请将用户的复杂问题分解成多个独立的、可以单独回答的简单问题。以JSON数组格式输出。
        """

        prompt = f"""
            ### 指令 ###
            {instruction}
        
            ### 原始问题 ###
            {query}
        
            ### 分解后的问题列表 ###
            请以JSON数组格式输出,例如:["问题1", "问题2", "问题3"]
        """

        response = get_completion(prompt, self.model)
        try:
            return json.loads(response)
        except:
            return [response]

    def rewrite_rhetorical_query(self, current_query, conversation_history):
        """反问型Query改写"""
        instruction = """
            你是一个沟通理解大师。请分析用户的反问或带有情绪的陈述,识别其背后真实的意图和问题。
            然后,将这个反问改写成一个中立、客观、可以直接用于知识库检索的问题。
        """

        prompt = f"""
            ### 指令 ###
            {instruction}
        
            ### 对话历史 ###
            {conversation_history}
        
            ### 当前问题 ###
            {current_query}
        
            ### 改写后的问题 ###
        """

        return get_completion(prompt, self.model)

    def auto_rewrite_query(self, query, conversation_history="", context_info=""):
        """自动识别Query类型并进行改写"""
        instruction = """
            你是一个智能的查询分析专家。请分析用户的查询,识别其属于以下哪种类型:
            1. 上下文依赖型 - 包含"还有"、"其他"等需要上下文理解的词汇
            2. 对比型 - 包含"哪个"、"比较"、"更"、"哪个更好"、"哪个更"等比较词汇
            3. 模糊指代型 - 包含"它"、"他们"、"都"、"这个"等指代词
            4. 多意图型 - 包含多个独立问题,用"、"或"?"分隔
            5. 反问型 - 包含"不会"、"难道"等反问语气
            说明:如果同时存在多意图型、模糊指代型,优先级为多意图型>模糊指代型
            
            请返回JSON格式的结果:
            {
                "query_type": "查询类型",
                "rewritten_query": "改写后的查询",
                "confidence": "置信度(0-1)"
            }
        """

        prompt = f"""
            ### 指令 ###
            {instruction}
            
            ### 对话历史 ###
            {conversation_history}
            
            ### 上下文信息 ###
            {context_info}
            
            ### 原始查询 ###
            {query}
            
            ### 分析结果 ###
        """

        response = get_completion(prompt, self.model)
        try:
            return json.loads(response)
        except:
            return {
                "query_type": "未知类型",
                "rewritten_query": query,
                "confidence": 0.5
            }


    def auto_rewrite_and_execute(self, query, conversation_history="", context_info=""):
        """自动识别Query类型并进行改写,然后根据类型调用相应的改写方法"""
        # 首先进行自动识别
        result = self.auto_rewrite_query(query, conversation_history, context_info)

        # 根据识别结果调用相应的改写方法
        query_type = result.get('query_type', '')

        if '上下文依赖' in query_type:
            final_result = self.rewrite_context_dependent_query(query, conversation_history)
        elif '对比' in query_type:
            final_result = self.rewrite_comparative_query(query, context_info or conversation_history)
        elif '模糊指代' in query_type:
            final_result = self.rewrite_ambiguous_reference_query(query, conversation_history)
        elif '多意图' in query_type:
            final_result = self.rewrite_multi_intent_query(query)
        elif '反问' in query_type:
            final_result = self.rewrite_rhetorical_query(query, conversation_history)
        else:
            # 对于其他类型,返回自动识别的改写结果
            final_result = result.get('rewritten_query', query)

        return {
            "original_query": query,
            "detected_type": query_type,
            "confidence": result.get('confidence', 0.5),
            "rewritten_query": final_result,
            "auto_rewrite_result": result
        }


def main():
    # 初始化Query改写器
    rewriter = QueryRewriter()
    print("=== Query改写功能使用示例(迪士尼主题乐园) ===\n")

    # 示例1: 上下文依赖型Query
    print("示例1: 上下文依赖型Query")
    conversation_history = """
    用户: "我想了解一下上海迪士尼乐园的最新项目。"
    AI: "上海迪士尼乐园最新推出了'疯狂动物城'主题园区,这里有朱迪警官和尼克狐的互动体验。"
    用户: "这个园区有什么游乐设施?"
    AI: "'疯狂动物城'园区目前有疯狂动物城警察局、朱迪警官训练营和尼克狐的冰淇淋店等设施。"
    """
    current_query = "还有其他设施吗?"

    print(f"对话历史: {conversation_history}")
    print(f"当前查询: {current_query}")

    result = rewriter.rewrite_context_dependent_query(current_query, conversation_history)
    print(f"改写结果: {result}\n")

    # 示例2: 对比型Query
    print("示例2: 对比型Query")
    conversation_history = """
    用户: "我想了解一下上海迪士尼乐园的最新项目。"
    AI: "上海迪士尼乐园最新推出了疯狂动物城主题园区,还有蜘蛛侠主题园区"
    """
    current_query = "哪个游玩的时间比较长,比较有趣"

    print(f"对话历史: {conversation_history}")
    print(f"当前查询: {current_query}")

    result = rewriter.rewrite_comparative_query(current_query, conversation_history)
    print(f"改写结果: {result}\n")

    # 示例3: 模糊指代型Query
    print("示例3: 模糊指代型Query")
    conversation_history = """
    用户: "我想了解一下上海迪士尼乐园和香港迪士尼乐园的烟花表演。"
    AI: "好的,上海迪士尼乐园和香港迪士尼乐园都有精彩的烟花表演。"
    """
    current_query = "都什么时候开始?"

    print(f"对话历史: {conversation_history}")
    print(f"当前查询: {current_query}")

    result = rewriter.rewrite_ambiguous_reference_query(current_query, conversation_history)
    print(f"改写结果: {result}\n")

    # 示例4: 多意图型Query
    print("示例4: 多意图型Query")
    query = "门票多少钱?需要提前预约吗?停车费怎么收?"

    print(f"原始查询: {query}")

    result = rewriter.rewrite_multi_intent_query(query)
    print(f"分解结果: {result}\n")

    # 示例5: 反问型Query
    print("示例5: 反问型Query")
    conversation_history = """
    用户: "你好,我想预订下周六上海迪士尼乐园的门票。"
    AI: "正在为您查询... 查询到下周六的门票已经售罄。"
    用户: "售罄是什么意思?我朋友上周去还能买到当天的票。"
    """
    current_query = "这不会也要提前一个月预订吧?"

    print(f"对话历史: {conversation_history}")
    print(f"当前查询: {current_query}")

    result = rewriter.rewrite_rhetorical_query(current_query, conversation_history)
    print(f"改写结果: {result}\n")

    # 示例6: 自动识别Query类型
    print("示例6: 自动识别Query类型")
    test_queries = [
        "还有其他游乐项目吗?",
        "哪个园区更好玩?",
        "都适合小朋友吗?",
        "有什么餐厅?价格怎么样?",
        "这不会也要排队两小时吧?"
    ]

    for i, query in enumerate(test_queries, 1):
        print(f"测试查询 {i}: {query}")
        result = rewriter.auto_rewrite_query(query)
        print(f"  识别类型: {result['query_type']}")
        print(f"  改写结果: {result['rewritten_query']}")
        print(f"  置信度: {result['confidence']}\n")


if __name__ == "__main__":
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306

# Query+联网搜索

以迪士尼RAG助手为例,用户Query需要联网的情况都有哪些?

类型 关键词特征 示例查询 原因说明
时效性 最新、今天、现在、实时、当前 上海迪士尼乐园今天开放吗? 需获取当前时间的最新信息
价格信息 多少钱、价格、费用、票价 下周六的门票多少钱? 价格信息经常变动,需实时查询
营业信息 营业时间、开放时间、闭园时间、是否开放迪士尼乐园现在开门吗? 营业状态可能因特殊情况调整
活动信息 活动、表演、演出、节日、庆典 最近有什么特别活动? 活动信息具有时效性和动态性
天气信息 天气、下雨、温度 明天去迪士尼天气怎么样? 天气信息需要实时获取
交通信息 怎么去、交通、地铁、公交 从浦东机场怎么去迪士尼? 交通信息可能因施工、活动等变化
预订信息 预订、预约、购票、订票需要提前多久预订? 预订政策可能随时调整
实时状态 排队、拥挤、人流量现在人多不多? 实时状态需即时获取

以迪士尼RAG助手为例,搭建Query+联网搜索的改写功能,方便后续进行RAG问答

  • 设置联网搜索的使用场景,比如:时效性、天气等
  • 识别逻辑:通过LLM,判断是否需要联网搜索
  • 改写逻辑:通过LLM,对联网搜索的情况,进行查询改写
  • 生成策略(可选):可以提出更详细的平台,以及平台关键词,方便后续进行联网搜索

如果后续可以使用TavilyMCP进行具体的联网搜索,可以引导LLM生成具体的参数

# Query联网搜索改写功能
# 导入依赖库
import dashscope
import os
import json
import re
from datetime import datetime

# 从环境变量中获取 API Key
dashscope.api_key = "你的key"

# 基于 prompt 生成文本
def get_completion(prompt, model="qwen-turbo"):
    messages = [{"role": "user", "content": prompt}]
    response = dashscope.Generation.call(
        model=model,
        messages=messages,
        result_format='message',
        temperature=0,
    )
    return response.output.choices[0].message.content


class WebSearchQueryRewriter:
    def __init__(self, model="qwen-turbo-latest"):
        self.model = model

    def identify_web_search_needs(self, query, conversation_history=""):
        """识别查询是否需要联网搜索"""
        instruction = """
            你是一个智能的查询分析专家。请分析用户的查询,判断是否需要联网搜索来获取最新、最准确的信息。
            
            需要联网搜索的情况包括:
            1. 时效性信息 - 包含"最新"、"今天"、"现在"、"实时"、"当前"等时间相关词汇
            2. 价格信息 - 包含"多少钱"、"价格"、"费用"、"票价"等价格相关词汇
            3. 营业信息 - 包含"营业时间"、"开放时间"、"闭园时间"、"是否开放"等营业状态
            4. 活动信息 - 包含"活动"、"表演"、"演出"、"节日"、"庆典"等动态信息
            5. 天气信息 - 包含"天气"、"下雨"、"温度"等天气相关
            6. 交通信息 - 包含"怎么去"、"交通"、"地铁"、"公交"等交通方式
            7. 预订信息 - 包含"预订"、"预约"、"购票"、"订票"等预订相关
            8. 实时状态 - 包含"排队"、"拥挤"、"人流量"等实时状态
            
            请返回JSON格式:
            {
                "need_web_search": true/false,
                "search_reason": "需要搜索的原因",
                "confidence": "置信度(0-1)"
            }
        """

        prompt = f"""
            ### 指令 ###
            {instruction}
            
            ### 对话历史 ###
            {conversation_history}
            
            ### 用户查询 ###
            {query}
            
            ### 分析结果 ###
        """

        response = get_completion(prompt, self.model)
        try:
            return json.loads(response)
        except:
            return {
                "need_web_search": False,
                "search_reason": "无法解析",
                "confidence": 0.5
            }

    def rewrite_for_web_search(self, query, search_type="general"):
        """为联网搜索改写查询"""
        instruction = """
        你是一个专业的搜索查询优化专家。请将用户的查询改写为更适合搜索引擎检索的形式。
        
        改写技巧:
        1. 添加具体地点 - 如"上海迪士尼乐园"、"香港迪士尼乐园"
        2. 添加时间范围 - 如"2024年"、"今天"、"本周"
        3. 使用关键词组合 - 将长句拆分为关键词
        4. 添加搜索意图 - 明确搜索目的
        5. 去除口语化表达 - 转换为标准搜索词
        6. 添加相关词汇 - 增加同义词或相关词
        
        请返回JSON格式:
        {
            "rewritten_query": "改写后的搜索查询",
            "search_keywords": ["关键词1", "关键词2", "关键词3"],
            "search_intent": "搜索意图",
            "suggested_sources": ["建议搜索的网站类型"]
        }
        """

        prompt = f"""
        ### 指令 ###
        {instruction}
        
        ### 原始查询 ###
        {query}
        
        ### 搜索类型 ###
        {search_type}
        
        ### 改写结果 ###
        """

        response = get_completion(prompt, self.model)
        try:
            return json.loads(response)
        except:
            return {
                "rewritten_query": query,
                "search_keywords": [query],
                "search_intent": "信息查询",
                "suggested_sources": ["官方网站", "旅游网站"]
            }

    def generate_search_strategy(self, query, search_type="general"):
        """生成搜索策略"""
        current_date = datetime.now().strftime("%Y年%m月%d日")
        instruction = f"""
            你是一个搜索策略专家。请为用户的查询制定详细的搜索策略。
            
            当前日期:{current_date}
            
            搜索策略包括:
            1. 主要搜索词 - 核心关键词
            2. 扩展搜索词 - 相关词汇和同义词
            3. 搜索网站 - 推荐的搜索平台
            4. 时间范围 - 具体的搜索时间范围
            
            请返回JSON格式:
            {{
                "primary_keywords": ["主要关键词"],
                "extended_keywords": ["扩展关键词"],
                "search_platforms": ["搜索平台"],
                "time_range": "具体的时间范围"
            }}
            """

        prompt = f"""
            ### 指令 ###
            {instruction}
            
            ### 用户查询 ###
            {query}
            
            ### 搜索类型 ###
            {search_type}
            
            ### 搜索策略 ###
        """

        response = get_completion(prompt, self.model)
        try:
            return json.loads(response)
        except:
            return {
                "primary_keywords": [query],
                "extended_keywords": [],
                "search_platforms": ["百度", "谷歌"],
                "time_range": "最近一周"
            }

    def auto_web_search_rewrite(self, query, conversation_history=""):
        """自动识别并改写为联网搜索查询"""
        # 第一步:识别是否需要联网搜索
        search_analysis = self.identify_web_search_needs(query, conversation_history)

        if not search_analysis.get('need_web_search', False):
            return {
                "need_web_search": False,
                "reason": "查询不需要联网搜索",
                "original_query": query
            }

        # 第二步:改写查询
        rewritten_result = self.rewrite_for_web_search(query)

        # 第三步:生成搜索策略
        search_strategy = self.generate_search_strategy(query)

        return {
            "need_web_search": True,
            "search_reason": search_analysis.get('search_reason', ''),
            "confidence": search_analysis.get('confidence', 0.5),
            "original_query": query,
            "rewritten_query": rewritten_result.get('rewritten_query', query),
            "search_keywords": rewritten_result.get('search_keywords', []),
            "search_intent": rewritten_result.get('search_intent', ''),
            "suggested_sources": rewritten_result.get('suggested_sources', []),
            "search_strategy": search_strategy
        }


def main():
    # 初始化联网搜索Query改写器
    web_searcher = WebSearchQueryRewriter()

    print("=== Query联网搜索识别与改写示例(迪士尼主题乐园) ===\n")

    # 示例1: 时效性信息查询
    print("示例1: 时效性信息查询")
    conversation_history1 = """
        用户: "我想去上海迪士尼乐园玩"
        AI: "上海迪士尼乐园是一个很棒的选择!"
    """
    query1 = "上海迪士尼乐园今天开放吗?现在人多不多?"

    print(f"对话历史: {conversation_history1}")
    print(f"当前查询: {query1}")

    result1 = web_searcher.auto_web_search_rewrite(query1, conversation_history1)

    if result1['need_web_search']:
        print(f"✓ 需要联网搜索")
        print(f"  搜索原因: {result1['search_reason']}")
        print(f"  置信度: {result1['confidence']}")
        print(f"  改写查询: {result1['rewritten_query']}")
        print(f"  搜索关键词: {result1['search_keywords']}")
        print(f"  搜索意图: {result1['search_intent']}")
        print(f"  建议来源: {result1['suggested_sources']}")
        print(f"  搜索策略:")
        print(f"    - 主要关键词: {result1['search_strategy']['primary_keywords']}")
        print(f"    - 扩展关键词: {result1['search_strategy']['extended_keywords']}")
        print(f"    - 搜索平台: {result1['search_strategy']['search_platforms']}")
        print(f"    - 时间范围: {result1['search_strategy']['time_range']}")
    else:
        print(f"✗ 不需要联网搜索")
        print(f"  原因: {result1['reason']}")

    print("\n" + "=" * 60 + "\n")

    # 示例2: 价格和预订信息查询
    print("示例2: 价格和预订信息查询")
    query2 = "下周六的门票多少钱?需要提前多久预订?"

    print(f"当前查询: {query2}")

    result2 = web_searcher.auto_web_search_rewrite(query2)

    if result2['need_web_search']:
        print(f"✓ 需要联网搜索")
        print(f"  搜索原因: {result2['search_reason']}")
        print(f"  置信度: {result2['confidence']}")
        print(f"  改写查询: {result2['rewritten_query']}")
        print(f"  搜索关键词: {result2['search_keywords']}")
        print(f"  搜索意图: {result2['search_intent']}")
        print(f"  建议来源: {result2['suggested_sources']}")
        print(f"  搜索策略:")
        print(f"    - 主要关键词: {result2['search_strategy']['primary_keywords']}")
        print(f"    - 扩展关键词: {result2['search_strategy']['extended_keywords']}")
        print(f"    - 搜索平台: {result2['search_strategy']['search_platforms']}")
        print(f"    - 时间范围: {result2['search_strategy']['time_range']}")
    else:
        print(f"✗ 不需要联网搜索")
        print(f"  原因: {result2['reason']}")


if __name__ == "__main__":
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263

# 双向改写

将查询改写成文档(Query2Doc)或为文档生成查询(Doc2Query),缓解短文本向量化效果差的问题

  • Query2Doc:将查询改写成文档
    用户查询:“如何提高深度学习模型的训练效率?”

Query2Doc改写: 原始查询较短,可能无法充分表达用户意图。 通过Query2Doc生成一段扩展文档:

提高深度学习模型的训练效率可以从以下几个方面入手:

  1. 使用更高效的优化算法,如AdamW或LAMB。
  2. 采用混合精度训练(Mixed Precision Training),减少显存占用并加速计算。
  3. 使用分布式训练技术,如数据并行或模型并行。
  4. 对数据进行预处理和增强,减少训练时的冗余计算。
  5. 调整学习率调度策略,避免训练过程中的震荡。
  • Doc2Query:为文档生成关联查询
    文档内容: 本文介绍了深度学习模型训练中的优化技巧,包括:
    1.使用AdamW优化器替代传统的SGD。
    2.采用混合精度训练,减少显存占用。
    3.使用分布式训练技术加速大规模模型的训练……
    通过Doc2Query生成一组可能的查询:
  1. 如何选择深度学习模型的优化器?
  2. 混合精度训练有哪些优势?
  3. 分布式训练技术如何加速深度学习?
  4. 如何减少深度学习训练中的显存占用?
  5. 深度学习模型训练的最佳实践是什么?

# 索引扩展

  1. 离散索引扩展:使用关键词抽取、实体识别等技术生成离散索引,与向量检索互补,提升召回准确性。
  2. 连续索引扩展:结合多种向量模型(如OpenAI的Ada、智源的BGE)进行多路召回,取长补短。
  3. 混合索引召回:将BM25等离散索引与向量索引结合,通过Ensemble Retriever实现混合召回,提升召回多样性

提示

BM25是一种经典的文本检索算法,它是TF-IDF(词频-逆文档频率)的改进版本,通过更精细的词频饱和度和文档长度归一化,提升了检索的相关性排序效果。

# 离散索引扩展:

使用关键词抽取、实体识别等技术生成离散索引,与向量检索互补,提升召回准确性。

  • 关键词抽取:从文档中提取出重要的关键词,作为离散索引的一部分,用于补充向量检索的不足。

文档内容:

本文介绍了深度学习模型训练中的优化技巧,包括:
1.使用AdamW优化器替代传统的SGD。
2.采用混合精度训练,减少显存占用。
3.使用分布式训练技术加速大规模模型的训练。

通过关键词抽取技术(如TF-IDF、TextRank)提取出以下关键词:

["深度学习", "模型训练", "优化技巧", "AdamW","混合精度训练", "分布式训练"]

当用户查询“如何优化深度学习模型训练?”时,离散索引中的关键词能够快速匹配到相关文档。

  • 实体识别:从文档中识别出命名实体(如人名、地点、组织等),作为离散索引的一部分,增强检索的精确性。

文档内容:

2023年诺贝尔物理学奖授予了三位科学家,以表彰他们在量子纠缠领域的研究成果。

通过实体识别技术(如SpaCy、BERT-basedNER)提取出以下实体:

["2023年", "诺贝尔物理学奖", "量子纠缠"]

当用户查询“2023年诺贝尔物理学奖的获奖者是谁?”时,离散索引中的实体能够快速匹配到相关文档。

# 混合索引召回:

将离散索引(如关键词、实体)与向量索引结合,通过混合召回策略提升检索效果。

文档内容:

本文介绍了人工智能在医疗领域的应用,包括:
1.使用深度学习技术进行医学影像分析。
2.利用自然语言处理技术提取电子病历中的关键信息。
3.开发智能诊断系统辅助医生决策。

关键词抽取:

["人工智能", "医疗领域", "深度学习", "医学影像分析", "自然语言处理", "电子病历", "智能诊断系统"]
实体识别:
["人工智能", "医疗领域", "深度学习","自然语言处理"]

当用户查询“人工智能在医疗领域的应用有哪些?”时:

  • 离散索引通过关键词和实体匹配到相关文档。
  • 向量索引通过语义相似度匹配到相关文档。
  • 综合两种召回结果,提升检索的准确性和覆盖率。

# Small-to-Big

大规模内容(链接部分): 每篇论文的完整内容作为大规模内容,通过链接与小规模内容关联。

论文1:链接到完整的PDF文档,包含详细的实验和结果。
论文2:链接到完整的PDF文档,包含模型架构和性能分析。

Small-to-Big机制:

小规模内容检索:用户输入查询后,系统首先在小规模内容(如摘要、关键句或段落)中检索匹配的内容。小规模内容通常是通过摘要生成、关键句提取等技术从大规模内容中提取的,并建立索引。
链接到大规模内容:当小规模内容匹配到用户的查询后,系统会通过预定义的链接(如文档ID、URL或指针)找到对应的大规模内容(如完整的文档、文章)。大规模内容包含更详细的上下文信息,为RAG提供丰富的背景知识。
上下文补充:将大规模内容作为RAG系统的上下文输入,结合用户查询和小规模内容,生成更准确和连贯的答案。

# 知识库处理

# 场景1:知识库问题生成与检索优化

  • 当用户提问与知识切片的相似度不高时,能否通过AI为每个知识切片生成可能的问题,通过问题与问题的匹配来提高检索准确度?

TO DO:对知识库进行问题生成,并进行检索优化

  • 自动生成多样化问题:为知识库中的每个知识切片自动生成多种类型、不同难度的问题
    • generate_questions_for_chunk() :为单个知识切片生成基础问题
    • generate_diverse_questions() :生成更多样化的问题(8个)
  • 构建双重检索索引:同时构建基于原文内容和生成问题的BM25检索索引
  • 检索评估:针对两种检索方式,进行详细的评估
# 知识库问题生成与检索优化 - BM25版本
# 导入依赖库
import os
import json
import numpy as np
from openai import OpenAI
import pandas as pd
from datetime import datetime
from rank_bm25 import BM25Okapi
import jieba
import re

# 从环境变量中获取 API Key
DASHSCOPE_API_KEY = "your api key"

# 初始化百炼兼容的 OpenAI 客户端
client = OpenAI(
    api_key=DASHSCOPE_API_KEY,
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

# 预处理AI响应中的JSON格式
def preprocess_json_response(response):
    """预处理AI响应,移除markdown代码块格式"""
    if not response:
        return ""
    
    # 移除markdown代码块格式
    if response.startswith('```json'):
        response = response[7:]  # 移除 ```json
    elif response.startswith('```'):
        response = response[3:]  # 移除 ```
    
    if response.endswith('```'):
        response = response[:-3]  # 移除结尾的 ```
    
    return response.strip()  # 移除首尾空白

# 基于 prompt 生成文本
def get_completion(prompt, model="qwen-turbo-latest"):
    messages = [{"role": "user", "content": prompt}]
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.7,
    )
    return response.choices[0].message.content

# 文本预处理和分词
def preprocess_text(text):
    """文本预处理和分词"""
    if not text:
        return []
    
    # 移除标点符号和特殊字符
    text = re.sub(r'[^\w\s]', '', text)
    
    # 使用jieba分词
    words = jieba.lcut(text)
    
    # 过滤停用词和短词
    stop_words = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这'}
    words = [word for word in words if len(word) > 1 and word not in stop_words]
    
    return words

class KnowledgeBaseOptimizer:
    def __init__(self, model="qwen-turbo-latest"):
        self.model = model
        self.knowledge_base = []
        self.content_bm25 = None
        self.question_bm25 = None
        self.content_documents = []
        self.question_documents = []
        self.content_metadata = []
        self.question_metadata = []
        
    def generate_questions_for_chunk(self, knowledge_chunk, num_questions=5):
        """为单个知识切片生成多样化问题"""
        instruction = """
你是一个专业的问答系统专家。给定的知识内容能回答哪些多样化的问题,这些问题可以:
1. 使用不同的问法(直接问、间接问、对比问等)
2. 避免重复和相似的问题
3. 确保问题不超出知识内容范围

请返回JSON格式:
{
    "questions": [
        {
            "question": "问题内容",
            "question_type": "问题类型(直接问/间接问/对比问/条件问等)",
            "difficulty": "难度等级(简单/中等/困难)"
        }
    ]
}
"""
        
        prompt = f"""
### 指令 ###
{instruction}

### 知识内容 ###
{knowledge_chunk}

### 生成问题数量 ###
{num_questions}

### 生成结果 ###
"""
        
        response = get_completion(prompt, self.model)
        
        # 预处理响应,移除markdown代码块格式
        response = preprocess_json_response(response)
        
        try:
            result = json.loads(response)
            return result.get('questions', [])
        except json.JSONDecodeError as e:
            print(f"JSON解析失败: {e}")
            print(f"AI返回内容: {response[:50]}...")
            # 如果JSON解析失败,返回简单的问题列表
            return [{"question": f"关于{knowledge_chunk[:50]}...的问题", "question_type": "直接问", "keywords": [], "difficulty": "中等"}]
    
    def build_knowledge_index(self, knowledge_base):
        """构建知识库的BM25索引(包括原文和问题)"""
        print("正在构建知识库索引...")
        
        self.knowledge_base = knowledge_base
        content_documents = []
        question_documents = []
        content_metadata = []
        question_metadata = []
        
        for i, chunk in enumerate(knowledge_base):
            # 获取知识切片的内容
            text = chunk.get('content', '')
            if not text.strip():
                continue
                
            # 原文文档
            content_words = preprocess_text(text)
            if content_words:
                content_documents.append(content_words)
                content_metadata.append({
                    "id": chunk.get('id', f"chunk_{i}"),
                    "content": text,
                    "category": chunk.get('category', ''),
                    "chunk": chunk,
                    "type": "content"
                })
            
            # 问题文档(如果存在生成的问题)
            if 'generated_questions' in chunk and chunk['generated_questions']:
                for j, question_data in enumerate(chunk['generated_questions']):
                    question = question_data.get('question', '')
                    if question.strip():
                        # 拼接全文和问题,保持上下文
                        combined_text = f"内容:{text} 问题:{question}"
                        question_words = preprocess_text(combined_text)
                        
                        if question_words:
                            question_documents.append(question_words)
                            question_metadata.append({
                                "id": f"{chunk.get('id', f'chunk_{i}')}_q{j}",
                                "content": question,
                                "combined_content": combined_text,
                                "category": chunk.get('category', ''),
                                "chunk": chunk,
                                "type": "question",
                                "question_data": question_data
                            })
        
        # 创建BM25索引
        if content_documents:
            self.content_bm25 = BM25Okapi(content_documents)
            self.content_documents = content_documents
            self.content_metadata = content_metadata
            print(f"原文索引构建完成,共索引 {len(content_documents)} 个知识切片")
        
        if question_documents:
            self.question_bm25 = BM25Okapi(question_documents)
            self.question_documents = question_documents
            self.question_metadata = question_metadata
            print(f"问题索引构建完成,共索引 {len(question_documents)} 个问题")
        
        if not content_documents and not question_documents:
            print("没有有效的内容可以索引")
    
    def search_similar_chunks(self, query, k=3, search_type="content"):
        """使用BM25搜索相似的内容(原文或问题)"""
        if search_type == "content":
            if not self.content_bm25:
                return []
            bm25 = self.content_bm25
            metadata_store = self.content_metadata
        elif search_type == "question":
            if not self.question_bm25:
                return []
            bm25 = self.question_bm25
            print('question_bm25=', bm25)
            metadata_store = self.question_metadata
        else:
            return []
        
        try:
            # 预处理查询
            query_words = preprocess_text(query)
            if not query_words:
                return []
            
            # 搜索最相似的k个内容
            scores = bm25.get_scores(query_words)
            
            # 获取top-k结果
            top_indices = np.argsort(scores)[::-1][:k]
            
            results = []
            for idx in top_indices:
                if scores[idx] > 0:  # 只返回有相关性的结果
                    metadata = metadata_store[idx]
                    # 将BM25分数转换为0-1范围的相似度
                    similarity = min(1.0, scores[idx] / 10.0)  # 归一化
                    results.append({
                        "metadata": metadata,
                        "score": scores[idx],
                        "similarity": similarity
                    })
            
            return results
            
        except Exception as e:
            print(f"搜索失败: {e}")
            return []
    
    def calculate_similarity(self, query, knowledge_chunk):
        """计算查询与知识切片的相似度(使用BM25)"""
        try:
            query_words = preprocess_text(query)
            chunk_words = preprocess_text(knowledge_chunk)
            
            if not query_words or not chunk_words:
                return 0.0
            
            # 创建临时BM25索引
            temp_bm25 = BM25Okapi([chunk_words])
            scores = temp_bm25.get_scores(query_words)
            
            # 返回最高分数并归一化
            max_score = max(scores) if scores else 0.0
            return min(1.0, max_score / 10.0)
            
        except Exception as e:
            print(f"相似度计算失败: {e}")
            return 0.0
    
    def calculate_question_similarity(self, user_query, generated_questions):
        """计算用户查询与生成问题的相似度"""
        similarities = []
        for question_data in generated_questions:
            question = question_data['question']
            similarity = self.calculate_similarity(user_query, question)
            similarities.append(similarity)
        return max(similarities) if similarities else 0.0
    
    def evaluate_retrieval_methods(self, knowledge_base, test_queries):
        """评估两种检索方法的准确度"""
        # 首先构建知识库索引(包括原文和问题)
        self.build_knowledge_index(knowledge_base)
        
        results = {
            'content_similarity': [],
            'question_similarity': [],
            'improvement': [],
            'content_scores': [],
            'question_scores': [],
            'query_details': []
        }
        
        for i, query_info in enumerate(test_queries):
            user_query = query_info['query']
            correct_chunk = query_info['correct_chunk']
            
            # 方法1:BM25原文检索
            content_results = self.search_similar_chunks(user_query, k=1, search_type="content")
            content_correct = False
            content_score = 0.0
            content_chunk_id = None
            if content_results:
                best_match = content_results[0]['metadata']['chunk']
                content_correct = best_match['content'] == correct_chunk
                content_score = content_results[0]['similarity']
                content_chunk_id = best_match['id']
            
            # 方法2:BM25问题检索
            question_results = self.search_similar_chunks(user_query, k=1, search_type="question")
            question_correct = False
            question_score = 0.0
            question_chunk_id = None
            if question_results:
                best_match = question_results[0]['metadata']['chunk']
                question_correct = best_match['content'] == correct_chunk
                question_score = question_results[0]['similarity']
                question_chunk_id = best_match['id']
            
            results['content_similarity'].append(content_correct)
            results['question_similarity'].append(question_correct)
            results['improvement'].append(question_correct and not content_correct)
            results['content_scores'].append(content_score)
            results['question_scores'].append(question_score)
            
            # 记录查询详情
            results['query_details'].append({
                'query': user_query,
                'content_score': content_score,
                'question_score': question_score,
                'content_correct': content_correct,
                'question_correct': question_correct,
                'score_diff': question_score - content_score,
                'content_chunk_id': content_chunk_id,
                'question_chunk_id': question_chunk_id
            })
        
        return results
    
    def generate_diverse_questions(self, knowledge_chunk, num_questions=8):
        """生成更多样化的问题(更丰富)"""
        instruction = """
你是一个专业的问答系统专家。请为给定的知识内容生成高度多样化的问题,确保:
1. 问题类型多样化:直接问、间接问、对比问、条件问、假设问、推理问等
2. 表达方式多样化:使用不同的句式、词汇、语气
3. 难度层次多样化:简单、中等、困难的问题都要有
4. 角度多样化:从不同角度和维度提问
5. 确保问题不超出知识内容范围

请返回JSON格式:
{
    "questions": [
        {
            "question": "问题内容",
            "question_type": "问题类型",
            "difficulty": "难度等级",
            "perspective": "提问角度",
            "is_answerable": "给出的知识能否回答该问题",
            "answer": "基于该知识的回答"
        }
    ]
}
"""
        
        prompt = f"""
### 指令 ###
{instruction}

### 知识内容 ###
{knowledge_chunk}

### 生成问题数量 ###
{num_questions}

### 生成结果 ###
"""
        
        response = get_completion(prompt, self.model)
        
        # 预处理响应,移除markdown代码块格式
        response = preprocess_json_response(response)
        
        try:
            result = json.loads(response)
            return result.get('questions', [])
        except json.JSONDecodeError as e:
            print(f"多样化问题生成JSON解析失败: {e}")
            print(f"AI返回内容: {response[:200]}...")
            return []

def main():
    # 初始化知识库优化器
    optimizer = KnowledgeBaseOptimizer()
    
    print("=== 知识库问题生成与检索优化示例(BM25版本)- 迪士尼主题乐园 ===\n")
    
    # 示例知识库
    knowledge_base = [
        {
            "id": "kb_001",
            "content": "上海迪士尼乐园位于上海市浦东新区,是中国大陆首座迪士尼主题乐园,于2016年6月16日开园。乐园占地面积390公顷,包含七大主题园区:米奇大街、奇想花园、探险岛、宝藏湾、明日世界、梦幻世界和迪士尼小镇。",
            "category": "基本信息"
        },
        {
            "id": "kb_002", 
            "content": "上海迪士尼乐园的门票价格根据季节和日期有所不同。平日成人票价为399元,周末和节假日为499元。儿童票(1.0-1.4米)平日为299元,周末和节假日为374元。1.0米以下儿童免费入园。",
            "category": "价格信息"
        },
        {
            "id": "kb_003",
            "content": "上海迪士尼乐园的营业时间通常为上午8:00至晚上8:00,但具体时间会根据季节和特殊活动进行调整。建议游客在出发前查看官方网站或APP获取最新的营业时间信息。",
            "category": "营业信息"
        },
        {
            "id": "kb_004",
            "content": "从上海市区到上海迪士尼乐园有多种交通方式:1. 地铁11号线迪士尼站下车;2. 乘坐迪士尼专线巴士;3. 打车约40-60分钟;4. 自驾车可停在乐园停车场,停车费为100元/天。",
            "category": "交通信息"
        },
        {
            "id": "kb_005",
            "content": "上海迪士尼乐园的特色项目包括:创极速光轮(明日世界)、七个小矮人矿山车(梦幻世界)、加勒比海盗:战争之潮(宝藏湾)、翱翔·飞越地平线(探险岛)等。这些项目都有不同的身高和年龄限制。",
            "category": "游乐项目"
        },
        {
            "id": "kb_006",
            "content": "上海迪士尼乐园提供多种餐饮选择,包括米奇大街的皇家宴会厅、奇想花园的漫月轩、宝藏湾的巴波萨烧烤等。园内餐厅价格相对较高,人均消费约150-300元。建议游客可以携带密封包装的零食和水入园。",
            "category": "餐饮信息"
        },
        {
            "id": "kb_007",
            "content": "上海迪士尼乐园的购物体验非常丰富,每个主题园区都有特色商店。米奇大街的M大街购物廊是最大的综合商店,销售各种迪士尼周边商品。建议游客在离园前购买纪念品,避免携带不便。",
            "category": "购物信息"
        },
        {
            "id": "kb_008",
            "content": "上海迪士尼乐园提供多种服务设施,包括婴儿车租赁(50元/天)、轮椅租赁(免费)、储物柜(60元/天)、充电宝租赁等。园内设有多个医疗点和失物招领处,为游客提供便利服务。",
            "category": "服务设施"
        }
    ]
    
    # 示例1: 为知识切片生成问题
    print("示例1: 为知识切片生成多样化问题")
    test_chunk = knowledge_base[0]['content']
    print(f"知识内容: {test_chunk}")
    
    questions = optimizer.generate_questions_for_chunk(test_chunk, num_questions=5)
    print(f"\n生成的5个问题:")
    for i, q in enumerate(questions, 1):
        print(f"  {i}. {q['question']} (类型: {q['question_type']}, 难度: {q['difficulty']})")
    
    print("\n" + "="*60 + "\n")
    
    # 示例2: 生成更多样化的问题
    print("示例2: 生成更多样化的问题(8个)")
    diverse_questions = optimizer.generate_diverse_questions(test_chunk, num_questions=8)
    print(f"\n生成的8个多样化问题:")
    for i, q in enumerate(diverse_questions, 1):
        print(f"  {i}. {q['question']}")
        print(f"     类型: {q['question_type']}, 难度: {q['difficulty']}, 角度: {q['perspective']}, 能否回答: {q['is_answerable']}, 回答的答案:{q['answer']}")
    
    print("\n" + "="*60 + "\n")
    
    # 示例3: 评估检索方法
    print("示例3: 评估两种检索方法的准确度")
    
    # 测试查询 - 设计更有挑战性的问题
    test_queries = [
        {
            "query": "如果我想体验最刺激的过山车,应该去哪个区域?",
            "correct_chunk": knowledge_base[4]['content']
        },
        {
            "query": "什么时间去人比较少?",
            "correct_chunk": knowledge_base[2]['content']
        },
        {
            "query": "可以带食物进去吗?",
            "correct_chunk": knowledge_base[5]['content']
        }
    ]
    
    # 为知识库生成问题
    print('正在为知识库生成问题...')
    for chunk in knowledge_base:
        chunk['generated_questions'] = optimizer.generate_questions_for_chunk(chunk['content'])
        #print("chunk['generated_questions']=", chunk['generated_questions'])
    print('为知识库生成问题完毕')
    
    # 评估检索方法
    results = optimizer.evaluate_retrieval_methods(knowledge_base, test_queries)
    
    print(f"测试查询数量: {len(test_queries)}")
    print(f"BM25原文检索准确率: {sum(results['content_similarity'])/len(results['content_similarity'])*100:.1f}%")
    print(f"BM25问题检索准确率: {sum(results['question_similarity'])/len(results['question_similarity'])*100:.1f}%")
    print(f"问题检索改进的查询数量: {sum(results['improvement'])}")
    
    # 详细分析
    print(f"\n=== 详细分析 ===")
    
    # 按相似度分数差异排序
    sorted_details = sorted(results['query_details'], key=lambda x: x['score_diff'], reverse=True)
    
    print(f"\n问题检索方法表现更好的查询(按分数差异排序):")
    for i, detail in enumerate(sorted_details[:5], 1):
        if detail['score_diff'] > 0:
            print(f"  {i}. 查询: {detail['query']}")
            print(f"     原文检索分数: {detail['content_score']:.3f}")
            print(f"     问题检索分数: {detail['question_score']:.3f}")
            print(f"     分数差异: +{detail['score_diff']:.3f}")
            print(f"     原文检索: {'✓' if detail['content_correct'] else '✗'}")
            print(f"     问题检索: {'✓' if detail['question_correct'] else '✗'}")
    
    print(f"\n原文检索方法表现更好的查询:")
    for i, detail in enumerate(sorted_details[-5:], 1):
        if detail['score_diff'] < 0:
            print(f"  {i}. 查询: {detail['query']}")
            print(f"     原文检索分数: {detail['content_score']:.3f}")
            print(f"     问题检索分数: {detail['question_score']:.3f}")
            print(f"     分数差异: {detail['score_diff']:.3f}")
            print(f"     原文检索: {'✓' if detail['content_correct'] else '✗'}")
            print(f"     问题检索: {'✓' if detail['question_correct'] else '✗'}")
        

if __name__ == "__main__":
    main() 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511

# 场景2:对话知识沉淀

产品上线后每天产生大量对话,如何从这些对话中提取和沉淀有价值的知识,持续丰富知识库?

TO DO:对话知识沉淀
核心功能:
•使用AI模型(通义千问)从对话中提取结构化知识
•支持多种知识类型:事实、问题、流程、注意事项等
•自动识别用户意图和对话摘要
核心函数:
•extract_knowledge_from_conversation() :从单次对话中提取知识
•batch_extract_knowledge() :批量提取知识
•merge_similar_knowledge():使用LLM合并相似知识点

# 对话知识提取与沉淀
# 导入依赖库
import dashscope
import os
import json
from datetime import datetime
from collections import Counter

# 从环境变量中获取 API Key
dashscope.api_key = "you api key"


# 预处理AI响应中的JSON格式
def preprocess_json_response(response):
    """预处理AI响应,移除markdown代码块格式"""
    if not response:
        return ""

    # 移除markdown代码块格式
    if response.startswith('```json'):
        response = response[7:]  # 移除 ```json
    elif response.startswith('```'):
        response = response[3:]  # 移除 ```

    if response.endswith('```'):
        response = response[:-3]  # 移除结尾的 ```

    return response.strip()  # 移除首尾空白


# 基于 prompt 生成文本
def get_completion(prompt, model="qwen-turbo-latest"):
    messages = [{"role": "user", "content": prompt}]
    response = dashscope.Generation.call(
        model=model,
        messages=messages,
        result_format='message',
        temperature=0.3,
    )
    return response.output.choices[0].message.content


class ConversationKnowledgeExtractor:
    def __init__(self, model="qwen-turbo-latest"):
        self.model = model
        self.extracted_knowledge = []
        self.knowledge_frequency = Counter()

    def extract_knowledge_from_conversation(self, conversation):
        """从单次对话中提取知识"""
        instruction = """
        你是一个专业的知识提取专家。请从给定的对话中提取有价值的知识点,包括:
        1. 事实性信息(地点、时间、价格、规则等)
        2. 用户需求和偏好
        3. 常见问题和解答
        4. 操作流程和步骤
        5. 注意事项和提醒

        请返回JSON格式:
        {
            "extracted_knowledge": [
                {
                    "knowledge_type": "知识类型(事实/需求/问题/流程/注意)",
                    "content": "知识内容",
                    "confidence": "置信度(0-1)",
                    "source": "来源(用户/AI/对话)",
                    "keywords": ["关键词1", "关键词2"],
                    "category": "分类"
                }
            ],
            "conversation_summary": "对话摘要",
            "user_intent": "用户意图"
        }
        """

                prompt = f"""
        ### 指令 ###
        {instruction}

        ### 对话内容 ###
        {conversation}

        ### 提取结果 ###
        """

        response = get_completion(prompt, self.model)

        # 预处理响应,移除markdown代码块格式
        response = preprocess_json_response(response)

        try:
            result = json.loads(response)
            return result
        except json.JSONDecodeError as e:
            print(f"对话知识提取JSON解析失败: {e}")
            print(f"AI返回内容: {response[:200]}...")
            return {
                "extracted_knowledge": [],
                "conversation_summary": "无法解析对话",
                "user_intent": "未知"
            }

    def batch_extract_knowledge(self, conversations):
        """批量提取知识"""
        all_knowledge = []

        for i, conversation in enumerate(conversations):
            print(f"正在处理对话 {i + 1}/{len(conversations)}...")

            result = self.extract_knowledge_from_conversation(conversation)
            all_knowledge.extend(result.get('extracted_knowledge', []))

            # 更新频率统计
            for knowledge in result.get('extracted_knowledge', []):
                key = f"{knowledge['knowledge_type']}:{knowledge['content'][:50]}"
                self.knowledge_frequency[key] += 1

        return all_knowledge

    def merge_similar_knowledge(self, knowledge_list):
        """使用LLM合并相似的知识点,过滤掉需求和问题类型"""
        # 过滤掉需求和问题类型的知识,因为它们是临时的、个性化的
        filtered_knowledge = [
            knowledge for knowledge in knowledge_list
            if knowledge.get('knowledge_type') not in ['需求', '问题']
        ]

        print(f"过滤前知识点数量: {len(knowledge_list)}")
        print(f"过滤后知识点数量: {len(filtered_knowledge)}")
        print(f"过滤掉的'需求'和'问题'类型知识点: {len(knowledge_list) - len(filtered_knowledge)}")

        # 按知识类型分组
        knowledge_by_type = {}
        for knowledge in filtered_knowledge:
            knowledge_type = knowledge.get('knowledge_type', '其他')
            if knowledge_type not in knowledge_by_type:
                knowledge_by_type[knowledge_type] = []
            knowledge_by_type[knowledge_type].append(knowledge)

        merged_knowledge = []

        # 对每个知识类型分别进行LLM合并
        for knowledge_type, knowledge_group in knowledge_by_type.items():
            if len(knowledge_group) == 1:
                # 只有一个知识点,直接添加
                merged_knowledge.append(knowledge_group[0])
            else:
                # 多个知识点,使用LLM合并
                merged = self.merge_knowledge_with_llm(knowledge_group, knowledge_type)
                merged_knowledge.append(merged)

        return merged_knowledge

    def merge_knowledge_with_llm(self, knowledge_group, knowledge_type):
        """使用LLM合并同类型的知识组"""
        # 准备知识内容列表
        knowledge_contents = []
        all_keywords = set()
        all_sources = []

        for i, knowledge in enumerate(knowledge_group, 1):
            content = knowledge.get('content', '')
            confidence = knowledge.get('confidence', 0.5)
            keywords = knowledge.get('keywords', [])
            source = knowledge.get('source', '')
            category = knowledge.get('category', '')

            knowledge_contents.append(f"{i}. 内容: {content}")
            knowledge_contents.append(f"   置信度: {confidence}")
            knowledge_contents.append(f"   分类: {category}")
            knowledge_contents.append(f"   来源: {source}")
            knowledge_contents.append(f"   关键词: {', '.join(keywords)}")
            knowledge_contents.append("")

            all_keywords.update(keywords)
            if source and source not in all_sources:
                all_sources.append(source)

        # 构建LLM合并提示
        prompt = f"""
        你是一个专业的知识整理专家。请将以下{knowledge_type}类型的知识点进行智能合并,生成一个更完整、准确的知识点。

        ### 合并要求:
        1. 保留所有重要信息,避免信息丢失
        2. 消除重复内容,整合相似表述
        3. 提高内容的准确性和完整性
        4. 保持逻辑清晰,结构合理
        5. 合并后的置信度取所有知识点中的最高值

        ### 待合并的知识点:
        {chr(10).join(knowledge_contents)}

        ### 请返回JSON格式:
        {{
            "knowledge_type": "{knowledge_type}",
            "content": "合并后的知识内容",
            "confidence": 最高置信度值,
            "keywords": ["合并后的关键词列表"],
            "category": "合并后的分类",
            "sources": ["所有来源"],
            "frequency": {len(knowledge_group)}
        }}

        ### 合并结果:
        """

        response = get_completion(prompt, self.model)

        # 预处理响应
        response = preprocess_json_response(response)

        try:
            result = json.loads(response)
            return result
        except json.JSONDecodeError as e:
            print(f"知识合并JSON解析失败: {e}")
            print(f"AI返回内容: {response[:200]}...")

            # 解析失败时,使用简单的合并策略
            best_knowledge = max(knowledge_group, key=lambda x: x.get('confidence', 0))
            return {
                "knowledge_type": knowledge_type,
                "content": best_knowledge['content'],
                "confidence": best_knowledge.get('confidence', 0.5),
                "frequency": len(knowledge_group),
                "keywords": list(all_keywords),
                "category": best_knowledge['category'],
                "sources": all_sources
            }


def main():
    # 初始化对话知识提取器
    extractor = ConversationKnowledgeExtractor()

    print("=== 对话知识提取与沉淀示例(迪士尼主题乐园) ===\n")

    # 示例对话数据
    sample_conversations = [
        """
        用户: "我想去上海迪士尼乐园玩,门票多少钱?"
        AI: "上海迪士尼乐园的门票价格根据日期有所不同。平日成人票价为399元,周末和节假日为499元。儿童票(1.0-1.4米)平日为299元,周末为374元。1.0米以下儿童免费。"
        用户: "需要提前预订吗?"
        AI: "建议提前预订,特别是周末和节假日,可以通过官方网站或第三方平台预订。"
        用户: "从浦东机场怎么去?"
        AI: "从浦东机场到迪士尼乐园可以乘坐地铁2号线到广兰路站,然后换乘11号线到迪士尼站,全程约1小时。也可以打车,约40分钟。"
        """,
                """
        用户: "迪士尼乐园今天开放吗?"
        AI: "上海迪士尼乐园通常每天开放,营业时间为上午8:00至晚上8:00。但建议您出发前查看官方网站或APP确认具体营业时间,因为可能会因特殊活动或维护调整。"
        用户: "现在人多不多?"
        AI: "人流量会因日期、季节和活动而变化。周末、节假日和寒暑假通常人较多。建议您查看官方APP的实时人流量信息,或关注社交媒体上的游客分享。"
        用户: "有什么必玩的项目?"
        AI: "上海迪士尼乐园的必玩项目包括:创极速光轮(明日世界)、七个小矮人矿山车(梦幻世界)、加勒比海盗:战争之潮(宝藏湾)、翱翔·飞越地平线(探险岛)等。"
        """,
                """
        用户: "带小孩去迪士尼需要注意什么?"
        AI: "带小孩去迪士尼需要注意:1. 身高限制,部分项目有身高要求;2. 准备防晒、雨具等;3. 带些零食和水;4. 建议下载官方APP查看实时排队时间;5. 可以租用婴儿车;6. 注意安全,看管好孩子。"
        用户: "停车费怎么收?"
        AI: "上海迪士尼乐园停车场收费为100元/天,建议提前到达,因为停车场可能会满。也可以选择地铁出行,11号线迪士尼站就在乐园门口。"
        用户: "可以带食物进去吗?"
        AI: "可以携带密封包装的零食和水,但不能带玻璃瓶、酒精饮料等。园内有多个餐厅和小吃店,价格相对较高,建议合理安排。"
        """
    ]

    # 示例1: 从单次对话中提取知识
    print("示例1: 从单次对话中提取知识")
    conversation = sample_conversations[0]
    print(f"对话内容:\n{conversation}")

    extracted = extractor.extract_knowledge_from_conversation(conversation)
    print(f"\n提取的知识点:")
    for i, knowledge in enumerate(extracted['extracted_knowledge'], 1):
        print(f"  {i}. 类型: {knowledge['knowledge_type']}")
        print(f"     内容: {knowledge['content']}")
        print(f"     置信度: {knowledge['confidence']}")
        print(f"     分类: {knowledge['category']}")

    print(f"\n对话摘要: {extracted['conversation_summary']}")
    print(f"用户意图: {extracted['user_intent']}")

    print("\n" + "=" * 60 + "\n")

    # 示例2: 批量提取知识
    print("示例2: 批量提取知识")
    all_knowledge = extractor.batch_extract_knowledge(sample_conversations)
    print(f"总共提取了 {len(all_knowledge)} 个知识点")

    # 显示所有知识点
    print(f"\n所有知识点:")
    for key, count in extractor.knowledge_frequency.most_common():
        print(f"  {key}: {count}次")

    print("\n" + "=" * 60 + "\n")

    # 示例3: 合并相似知识
    print("示例3: 合并相似知识")
    merged_knowledge = extractor.merge_similar_knowledge(all_knowledge)
    print(f"合并后剩余 {len(merged_knowledge)} 个知识点")

    print(f"\n合并后的知识点:")
    for i, knowledge in enumerate(merged_knowledge, 1):
        print(f"  {i}. 类型: {knowledge.get('knowledge_type', '未知')}")
        print(f"     内容: {knowledge['content']}")
        print(f"     频率: {knowledge.get('frequency', 1)}次")
        print(f"     置信度: {knowledge.get('confidence', 0.5)}")
        print(f"     分类: {knowledge.get('category', '未知')}")
        print(f"     关键词: {knowledge.get('keywords', [])}")
        print(f"     来源: {knowledge.get('sources', [])}")
        print()

    print("\n" + "=" * 60 + "\n")


if __name__ == "__main__":
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316

# 场景3:知识库健康度检查

如何对整个知识库进行健康度检查,找出缺少的知识、过期的知识、冲突的知识,确保知识库的质量和可靠性。

TO DO:知识库健康度检查
核心功能:
•完整性检查:评估知识库是否覆盖用户的主要查询需求
•时效性检查:识别过期或需要更新的知识内容
•一致性检查:发现知识库中的冲突和矛盾信息
•综合评分:提供量化的健康度评分和改进建议

# 知识库健康度检查
# 导入依赖库
import dashscope
import os
import json
import re
from datetime import datetime

dashscope.api_key = "your api key"

# 基于 prompt 生成文本
def get_completion(prompt, model="qwen-turbo"):
    messages = [{"role": "user", "content": prompt}]
    response = dashscope.Generation.call(
        model=model,
        messages=messages,
        result_format='message',
        temperature=0.3,
    )
    return response.output.choices[0].message.content

class KnowledgeBaseHealthChecker:
    def __init__(self, model="qwen-turbo"):
        self.model = model
        self.health_report = {}

    def check_missing_knowledge(self, knowledge_base, test_queries):
        """使用LLM检查缺少的知识"""
        instruction = """
        你是一个知识库完整性检查专家。请分析给定的测试查询和知识库内容,判断知识库中是否缺少相关的知识。
        
        检查标准:
        1. 查询是否能在知识库中找到相关答案
        2. 知识是否完整、准确
        3. 是否覆盖了用户的主要需求
        4. 是否存在知识空白
        
        请返回JSON格式:
        {
            "missing_knowledge": [
                {
                    "query": "测试查询",
                    "missing_aspect": "缺少的知识方面",
                    "importance": "重要性(高/中/低)",
                    "suggested_content": "建议的知识内容",
                    "category": "知识分类"
                }
            ],
            "coverage_score": "覆盖率评分(0-1)",
            "completeness_analysis": "完整性分析"
        }
        """

        # 构建知识库内容摘要
        knowledge_summary = []
        for chunk in knowledge_base:
            knowledge_summary.append(f"ID: {chunk.get('id', 'unknown')} - {chunk.get('content', '')}")

        knowledge_text = "\n".join(knowledge_summary)

        # 构建测试查询列表
        queries_text = []
        for query_info in test_queries:
            query = query_info['query']
            expected = query_info.get('expected_answer', '')
            queries_text.append(f"查询: {query} | 期望答案: {expected}")

        queries_text = "\n".join(queries_text)

        prompt = f"""
        ### 指令 ###
        {instruction}
        
        ### 知识库内容 ###
        {knowledge_text}
        
        ### 测试查询 ###
        {queries_text}
        
        ### 分析结果 ###
        """

        try:
            response = get_completion(prompt, self.model)

            # 预处理响应,移除markdown代码块格式
            if response.startswith('```json'):
                response = response[7:]
            elif response.startswith('```'):
                response = response[3:]
            if response.endswith('```'):
                response = response[:-3]

            result = json.loads(response.strip())
            return result

        except Exception as e:
            print(f"LLM检查缺少知识失败: {e}")
            return None

    def check_outdated_knowledge(self, knowledge_base):
        """使用LLM检查过期的知识"""
        instruction = """
        你是一个知识时效性检查专家。请分析给定的知识内容,判断是否存在过期或需要更新的信息。
        
        检查标准:
        1. 时间相关信息是否过期(年份、日期、时间范围)
        2. 价格信息是否最新(价格、费用、票价等)
        3. 政策规则是否更新(政策、规定、规则等)
        4. 活动信息是否有效(活动、节日、特殊安排等)
        5. 联系方式是否准确(电话、地址、网址等)
        6. 技术信息是否过时(版本、技术标准等)
        
        请返回JSON格式:
        {
            "outdated_knowledge": [
                {
                    "chunk_id": "知识切片ID",
                    "content": "知识内容",
                    "outdated_aspect": "过期方面",
                    "severity": "严重程度(高/中/低)",
                    "suggested_update": "建议更新内容",
                    "last_verified": "最后验证时间"
                }
            ],
            "freshness_score": "新鲜度评分(0-1)",
            "update_recommendations": "更新建议"
        }
        """

        # 构建知识库内容
        knowledge_text = []
        for chunk in knowledge_base:
            content = chunk.get('content', '')
            chunk_id = chunk.get('id', 'unknown')
            last_updated = chunk.get('last_updated', 'unknown')
            knowledge_text.append(f"ID: {chunk_id} | 更新时间: {last_updated} | 内容: {content}")

        knowledge_text = "\n".join(knowledge_text)

        prompt = f"""
        ### 指令 ###
        {instruction}
        
        ### 知识库内容 ###
        {knowledge_text}
        
        ### 当前时间 ###
        {datetime.now().strftime('%Y年%m月%d日')}
        
        ### 分析结果 ###
        """

        try:
            response = get_completion(prompt, self.model)

            # 预处理响应,移除markdown代码块格式
            if response.startswith('```json'):
                response = response[7:]
            elif response.startswith('```'):
                response = response[3:]
            if response.endswith('```'):
                response = response[:-3]

            result = json.loads(response.strip())
            return result

        except Exception as e:
            print(f"LLM检查过期知识失败: {e}")
            return None

    def check_conflicting_knowledge(self, knowledge_base):
        """使用LLM检查冲突的知识"""
        instruction = """
        你是一个知识一致性检查专家。请分析给定的知识库,找出可能存在冲突或矛盾的信息。
        
        检查标准:
        1. 同一主题的不同说法(地点、名称、描述等)
        2. 价格信息的差异(价格、费用、收费标准等)
        3. 时间信息的不一致(营业时间、开放时间、活动时间等)
        4. 规则政策的冲突(规定、政策、要求等)
        5. 操作流程的差异(步骤、方法、流程等)
        6. 联系方式的差异(地址、电话、网址等)
        
        请返回JSON格式:
        {
            "conflicting_knowledge": [
                {
                    "conflict_type": "冲突类型",
                    "chunk_ids": ["相关切片ID"],
                    "conflicting_content": ["冲突内容"],
                    "severity": "严重程度(高/中/低)",
                    "resolution_suggestion": "解决建议"
                }
            ],
            "consistency_score": "一致性评分(0-1)",
            "conflict_analysis": "冲突分析"
        }
        """

        # 构建知识库内容
        knowledge_text = []
        for chunk in knowledge_base:
            content = chunk.get('content', '')
            chunk_id = chunk.get('id', 'unknown')
            knowledge_text.append(f"ID: {chunk_id} | 内容: {content}")

        knowledge_text = "\n".join(knowledge_text)

        prompt = f"""
        ### 指令 ###
        {instruction}
        
        ### 知识库内容 ###
        {knowledge_text}
        
        ### 分析结果 ###
        """

        try:
            response = get_completion(prompt, self.model)

            # 预处理响应,移除markdown代码块格式
            if response.startswith('```json'):
                response = response[7:]
            elif response.startswith('```'):
                response = response[3:]
            if response.endswith('```'):
                response = response[:-3]

            result = json.loads(response.strip())
            return result

        except Exception as e:
            print(f"LLM检查冲突知识失败: {e}")
            return None

    def calculate_overall_health_score(self, missing_result, outdated_result, conflicting_result):
        """计算整体健康度评分"""
        coverage_score = missing_result.get('coverage_score', 0)
        freshness_score = outdated_result.get('freshness_score', 0)
        consistency_score = conflicting_result.get('consistency_score', 0)

        # 加权计算
        overall_score = (
                coverage_score * 0.4 +  # 覆盖率权重40%
                freshness_score * 0.3 +  # 新鲜度权重30%
                consistency_score * 0.3  # 一致性权重30%
        )

        return overall_score


    def generate_health_report(self, knowledge_base, test_queries):
        """生成完整的健康度报告"""
        print("正在检查知识库健康度...")

        # 1. 检查缺少的知识
        print("1. 检查缺少的知识...")
        missing_result = self.check_missing_knowledge(knowledge_base, test_queries)

        # 2. 检查过期的知识
        print("2. 检查过期的知识...")
        outdated_result = self.check_outdated_knowledge(knowledge_base)

        # 3. 检查冲突的知识
        print("3. 检查冲突的知识...")
        conflicting_result = self.check_conflicting_knowledge(knowledge_base)

        # 4. 计算整体健康度
        overall_score = self.calculate_overall_health_score(missing_result, outdated_result, conflicting_result)

        # 5. 生成报告
        report = {
            "overall_health_score": overall_score,
            "health_level": self.get_health_level(overall_score),
            "missing_knowledge": missing_result,
            "outdated_knowledge": outdated_result,
            "conflicting_knowledge": conflicting_result,
            "recommendations": self.generate_recommendations(missing_result, outdated_result, conflicting_result),
            "check_date": datetime.now().isoformat()
        }

        return report

    def get_health_level(self, score):
        """根据评分确定健康等级"""
        if score >= 0.8:
            return "优秀"
        elif score >= 0.6:
            return "良好"
        elif score >= 0.4:
            return "一般"
        else:
            return "需要改进"

    def generate_recommendations(self, missing_result, outdated_result, conflicting_result):
        """生成改进建议"""
        recommendations = []

        # 基于缺少知识的建议
        missing_count = len(missing_result.get('missing_knowledge', []))
        if missing_count > 0:
            recommendations.append(f"补充{missing_count}个缺少的知识点,提高覆盖率")

        # 基于过期知识的建议
        outdated_count = len(outdated_result.get('outdated_knowledge', []))
        if outdated_count > 0:
            recommendations.append(f"更新{outdated_count}个过期知识点,确保信息时效性")

        # 基于冲突知识的建议
        conflicting_count = len(conflicting_result.get('conflicting_knowledge', []))
        if conflicting_count > 0:
            recommendations.append(f"解决{conflicting_count}个知识冲突,提高一致性")

        if not recommendations:
            recommendations.append("知识库状态良好,建议定期维护")

        return recommendations


def main():
    # 初始化知识库健康度检查器
    checker = KnowledgeBaseHealthChecker()

    print("=== 知识库健康度检查示例(迪士尼主题乐园) ===\n")

    # 示例知识库(包含一些故意的问题)
    knowledge_base = [
        {
            "id": "kb_001",
            "content": "上海迪士尼乐园位于上海市浦东新区,是中国大陆首座迪士尼主题乐园,于2016年6月16日开园。乐园占地面积390公顷,包含七大主题园区。",
            "last_updated": "2024-01-15"
        },
        {
            "id": "kb_002",
            "content": "上海迪士尼乐园的门票价格:平日成人票价为399元,周末和节假日为499元。儿童票平日为299元,周末为374元。",
            "last_updated": "2023-12-01"  # 故意设置为较旧的时间
        },
        {
            "id": "kb_003",
            "content": "上海迪士尼乐园门票价格:成人票平日350元,周末450元。儿童票平日250元,周末350元。",  # 故意设置冲突的价格
            "last_updated": "2024-02-01"
        },
        {
            "id": "kb_004",
            "content": "上海迪士尼乐园营业时间为上午8:00至晚上8:00,全年无休。",
            "last_updated": "2024-01-20"
        },
        {
            "id": "kb_005",
            "content": "从上海市区到迪士尼乐园可以乘坐地铁11号线到迪士尼站,或乘坐迪士尼专线巴士。",
            "last_updated": "2024-01-10"
        }
    ]

    # 测试查询
    test_queries = [
        {
            "query": "上海迪士尼乐园在哪里?",
            "expected_answer": "浦东新区"
        },
        {
            "query": "门票多少钱?",
            "expected_answer": "价格信息"
        },
        {
            "query": "营业时间是什么?",
            "expected_answer": "8:00-20:00"
        },
        {
            "query": "怎么去迪士尼?",
            "expected_answer": "地铁11号线"
        },
        {
            "query": "有什么特别活动?",  # 知识库中没有相关信息
            "expected_answer": "活动信息"
        },
        {
            "query": "停车费是多少?",  # 知识库中没有相关信息
            "expected_answer": "停车费信息"
        }
    ]

    # 生成健康度报告
    health_report = checker.generate_health_report(knowledge_base, test_queries)

    # 显示报告
    print("=== 知识库健康度报告 ===\n")

    print(f"整体健康度评分: {health_report['overall_health_score']:.2f}")
    print(f"健康等级: {health_report['health_level']}")
    print(f"检查时间: {health_report['check_date']}")

    print("\n" + "=" * 60 + "\n")

    # 详细分析
    print("=== 详细分析 ===\n")

    # 1. 缺少的知识
    print("1. 缺少的知识分析:")
    missing = health_report['missing_knowledge']
    print(f"   覆盖率: {health_report['missing_knowledge']['coverage_score'] * 100:.1f}%")
    print(f"   缺少知识点数量: {len(missing['missing_knowledge'])}")
    for i, item in enumerate(missing['missing_knowledge'][:3], 1):
        print(f"   {i}. 查询: {item['query']}")
        print(f"      缺少方面: {item['missing_aspect']}")
        print(f"      重要性: {item['importance']}")

    print("\n" + "-" * 40 + "\n")

    # 2. 过期的知识
    print("2. 过期的知识分析:")
    outdated = health_report['outdated_knowledge']
    print(f"   新鲜度评分: {outdated['freshness_score']:.2f}")
    print(f"   过期知识点数量: {len(outdated['outdated_knowledge'])}")
    for i, item in enumerate(outdated['outdated_knowledge'][:3], 1):
        print(f"   {i}. 切片ID: {item['chunk_id']}")
        print(f"      过期方面: {item['outdated_aspect']}")
        print(f"      严重程度: {item['severity']}")

    print("\n" + "-" * 40 + "\n")

    # 3. 冲突的知识
    print("3. 冲突的知识分析:")
    conflicting = health_report['conflicting_knowledge']
    print(f"   一致性评分: {conflicting['consistency_score']:.2f}")
    print(f"   冲突数量: {len(conflicting['conflicting_knowledge'])}")
    for i, item in enumerate(conflicting['conflicting_knowledge'][:3], 1):
        print(f"   {i}. 冲突类型: {item['conflict_type']}")
        print(f"      相关切片: {item['chunk_ids']}")
        print(f"      严重程度: {item['severity']}")

    print("\n" + "=" * 60 + "\n")

    # 改进建议
    print("=== 改进建议 ===\n")
    for i, recommendation in enumerate(health_report['recommendations'], 1):
        print(f"{i}. {recommendation}")


if __name__ == "__main__":
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443

# 场景4:知识库版本管理与性能比较

如何对知识库进行版本管理,实现回归测试、上线前验收,并比较不同版本的知识库性能,选择最优版本。

TO DO:知识库版本管理与性能比较
核心功能:
•版本创建:为知识库创建带描述和统计信息的版本
•哈希值计算:使用MD5计算版本的唯一标识
•统计信息:记录知识切片数量、内容长度、分类分布等
•版本比较:比较两个版本的差异和变化

# 知识库版本管理与性能比较
# 导入依赖库
import dashscope
import os
import json
import re
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import pandas as pd
import numpy as np
import faiss
from openai import OpenAI

dashscope.api_key = "your api key"

# 初始化百炼兼容的 OpenAI 客户端
client = OpenAI(
    api_key=dashscope.api_key,
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

# 全局配置
TEXT_EMBEDDING_MODEL = "text-embedding-v4"
TEXT_EMBEDDING_DIM = 1024

# 基于 prompt 生成文本
def get_completion(prompt, model="qwen-turbo"):
    messages = [{"role": "user", "content": prompt}]
    response = dashscope.Generation.call(
        model=model,
        messages=messages,
        result_format='message',
        temperature=0.3,
    )
    return response.output.choices[0].message.content

def get_text_embedding(text):
    """获取文本的 Embedding"""
    response = client.embeddings.create(
        model=TEXT_EMBEDDING_MODEL,
        input=text,
        dimensions=TEXT_EMBEDDING_DIM
    )
    return response.data[0].embedding

class KnowledgeBaseVersionManager:
    def __init__(self, model="qwen-turbo"):
        self.model = model
        self.versions = {}


    def create_version(self, knowledge_base, version_name, description=""):
        """创建知识库版本"""
        # 构建向量索引
        metadata_store, text_index = self.build_vector_index(knowledge_base)

        version_info = {
            "version_name": version_name,
            "description": description,
            "created_date": datetime.now().isoformat(),
            "knowledge_base": knowledge_base,
            "metadata_store": metadata_store,
            "text_index": text_index,
            "statistics": self.calculate_version_statistics(knowledge_base)
        }

        self.versions[version_name] = version_info
        return version_info

    def build_vector_index(self, knowledge_base):
        """构建向量索引"""
        metadata_store = [] # 用于保存每个文本块的元数据(如原始内容、ID 等),后续检索时可通过 ID 找回原文。
        text_vectors = [] # 用于保存每个文本块对应的向量(embedding),类型是浮点数列表或 NumPy 数组。

        for i, chunk in enumerate(knowledge_base):
            content = chunk.get('content', '')
            if not content.strip():
                continue

            # FAISS 用整数 id 快速索引,而 chunk_id 可以保留业务语义
            metadata = {
                "id": i, # 使用循环索引作为内部唯一 ID(注意:这个 id 是整数,用于 FAISS 的 add_with_ids
                "content": content,  # 保存原始文本,便于后续生成答案时引用。
                "chunk_id": chunk.get('id', f'chunk_{i}')  # 优先使用 chunk 自带的 id(如文档段落 ID),如果没有则自动生成 chunk_0, chunk_1 等。
            }

            # 获取文本embedding
            vector = get_text_embedding(content)
            text_vectors.append(vector)
            metadata_store.append(metadata)

        # 创建FAISS索引,适合中小规模数据(< 10 万条),简单高效
        text_index = faiss.IndexFlatL2(TEXT_EMBEDDING_DIM) # 创建一个精确的 L2 距离(欧氏距离)暴力搜索索引
        #包装原始索引,使其支持自定义 ID(而不是默认从 0 开始递增)。这样在搜索时能直接返回你指定的 id(这里是 i)。
        text_index_map = faiss.IndexIDMap(text_index)

        if text_vectors:
            # 提取所有元数据中的 id(即 i 的列表,如 [0, 1, 2, ...])
            text_ids = [m["id"] for m in metadata_store]
            # FAISS 要求输入是 float32 类型的 NumPy 二维数组
            text_index_map.add_with_ids(np.array(text_vectors).astype('float32'), np.array(text_ids))

        return metadata_store, text_index_map

    def calculate_version_statistics(self, knowledge_base):
        """计算版本统计信息"""
        total_chunks = len(knowledge_base)
        total_content_length = sum(len(chunk.get('content', '')) for chunk in knowledge_base)

        return {
            "total_chunks": total_chunks,
            "total_content_length": total_content_length,
            "average_chunk_length": total_content_length / total_chunks if total_chunks > 0 else 0
        }

    def compare_versions(self, version1_name, version2_name):
        """比较两个版本的差异"""
        if version1_name not in self.versions or version2_name not in self.versions:
            return {"error": "版本不存在"}

        v1 = self.versions[version1_name]
        v2 = self.versions[version2_name]

        kb1 = v1['knowledge_base']
        kb2 = v2['knowledge_base']

        comparison = {
            "version1": version1_name,
            "version2": version2_name,
            "comparison_date": datetime.now().isoformat(),
            "changes": self.detect_changes(kb1, kb2),
            "statistics_comparison": self.compare_statistics(v1['statistics'], v2['statistics'])
        }

        return comparison


    def detect_changes(self, kb1, kb2):
        """检测知识库变化"""
        changes = {
            "added_chunks": [],
            "removed_chunks": [],
            "modified_chunks": [],
            "unchanged_chunks": []
        }

        # 创建ID映射
        # 将列表转为字典,以 id 为键,实现 O(1) 快速查找
        kb1_dict = {chunk.get('id'): chunk for chunk in kb1}
        kb2_dict = {chunk.get('id'): chunk for chunk in kb2}

        # 检测新增和删除
        kb1_ids = set(kb1_dict.keys())
        kb2_ids = set(kb2_dict.keys())

        added_ids = kb2_ids - kb1_ids
        removed_ids = kb1_ids - kb2_ids
        common_ids = kb1_ids & kb2_ids

        # 记录新增的知识切片
        for chunk_id in added_ids:
            changes["added_chunks"].append({
                "id": chunk_id,
                "content": kb2_dict[chunk_id].get('content', '')
            })

        # 记录删除的知识切片
        for chunk_id in removed_ids:
            changes["removed_chunks"].append({
                "id": chunk_id,
                "content": kb1_dict[chunk_id].get('content', '')
            })

        # 检测修改的知识切片
        for chunk_id in common_ids:
            chunk1 = kb1_dict[chunk_id]
            chunk2 = kb2_dict[chunk_id]

            if chunk1.get('content') != chunk2.get('content'):
                changes["modified_chunks"].append({
                    "id": chunk_id,
                    "old_content": chunk1.get('content', ''),
                    "new_content": chunk2.get('content', '')
                })
            else:
                changes["unchanged_chunks"].append(chunk_id)

        return changes

    def compare_statistics(self, stats1, stats2):
        """比较统计信息"""
        comparison = {}

        for key in stats1.keys():
            if key in stats2:
                if isinstance(stats1[key], (int, float)):
                    comparison[key] = {
                        "version1": stats1[key],
                        "version2": stats2[key],
                        "difference": stats2[key] - stats1[key],
                        "percentage_change": ((stats2[key] - stats1[key]) / stats1[key] * 100) if stats1[
                                                                                                      key] != 0 else 0
                    }
                elif isinstance(stats1[key], dict):
                    comparison[key] = self.compare_dict_statistics(stats1[key], stats2[key])

        return comparison

    def compare_dict_statistics(self, dict1, dict2):
        """比较字典类型的统计信息"""
        comparison = {}
        all_keys = set(dict1.keys()) | set(dict2.keys())

        for key in all_keys:
            val1 = dict1.get(key, 0)
            val2 = dict2.get(key, 0)
            comparison[key] = {
                "version1": val1,
                "version2": val2,
                "difference": val2 - val1
            }

        return comparison

    def evaluate_version_performance(self, version_name, test_queries):
        """评估版本性能"""
        if version_name not in self.versions:
            return {"error": "版本不存在"}

        performance_metrics = {
            "version_name": version_name,
            "evaluation_date": datetime.now().isoformat(),
            "query_results": [],
            "overall_metrics": {}
        }

        total_queries = len(test_queries)
        correct_answers = 0
        response_times = []

        for query_info in test_queries:
            query = query_info['query']
            expected_answer = query_info.get('expected_answer', '')

            # 使用embedding检索
            start_time = datetime.now()
            retrieved_chunks = self.retrieve_relevant_chunks(query, version_name)
            end_time = datetime.now()

            response_time = (end_time - start_time).total_seconds()
            response_times.append(response_time)

            # 评估检索质量
            is_correct = self.evaluate_retrieval_quality(query, retrieved_chunks, expected_answer)
            if is_correct:
                correct_answers += 1

            performance_metrics["query_results"].append({
                "query": query,
                "retrieved_chunks": len(retrieved_chunks),
                "response_time": response_time,
                "is_correct": is_correct
            })

        # 计算整体指标
        accuracy = correct_answers / total_queries if total_queries > 0 else 0
        avg_response_time = sum(response_times) / len(response_times) if response_times else 0

        performance_metrics["overall_metrics"] = {
            "accuracy": accuracy,
            "avg_response_time": avg_response_time,
            "total_queries": total_queries,
            "correct_answers": correct_answers
        }

        return performance_metrics

    def retrieve_relevant_chunks(self, query, version_name, k=3):
        """使用embedding和faiss检索相关知识切片"""
        if version_name not in self.versions:
            return []

        version_info = self.versions[version_name]
        metadata_store = version_info['metadata_store']
        text_index = version_info['text_index']

        # 获取查询的embedding
        query_vector = np.array([get_text_embedding(query)]).astype('float32')

        # 使用faiss进行检索
        distances, indices = text_index.search(query_vector, k)

        relevant_chunks = []
        for i, doc_id in enumerate(indices[0]):
            if doc_id != -1:  # faiss返回-1表示没有找到匹配
                # 通过ID在元数据中查找
                match = next((item for item in metadata_store if item["id"] == doc_id), None)
                if match:
                    # 构造返回的知识切片格式
                    chunk = {
                        "id": match["chunk_id"],
                        "content": match["content"],
                        "similarity_score": 1.0 / (1.0 + distances[0][i])  # 将距离转换为相似度
                    }
                    relevant_chunks.append(chunk)

        return relevant_chunks

    def evaluate_retrieval_quality(self, query, retrieved_chunks, expected_answer):
        """评估检索质量"""
        if not retrieved_chunks:
            return False

        # 简化的质量评估
        for chunk in retrieved_chunks:
            content = chunk.get('content', '').lower()
            if expected_answer.lower() in content:
                return True

        return False

    def compare_version_performance(self, version1_name, version2_name, test_queries):
        """比较两个版本的性能"""
        perf1 = self.evaluate_version_performance(version1_name, test_queries)
        perf2 = self.evaluate_version_performance(version2_name, test_queries)

        if "error" in perf1 or "error" in perf2:
            return {"error": "版本评估失败"}

        comparison = {
            "version1": version1_name,
            "version2": version2_name,
            "comparison_date": datetime.now().isoformat(),
            "performance_comparison": {
                "accuracy": {
                    "version1": perf1["overall_metrics"]["accuracy"],
                    "version2": perf2["overall_metrics"]["accuracy"],
                    "improvement": perf2["overall_metrics"]["accuracy"] - perf1["overall_metrics"]["accuracy"]
                },
                "response_time": {
                    "version1": perf1["overall_metrics"]["avg_response_time"],
                    "version2": perf2["overall_metrics"]["avg_response_time"],
                    "improvement": perf1["overall_metrics"]["avg_response_time"] - perf2["overall_metrics"][
                        "avg_response_time"]
                }
            },
            "recommendation": self.generate_performance_recommendation(perf1, perf2)
        }

        return comparison

    def generate_performance_recommendation(self, perf1, perf2):
        """生成性能建议"""
        acc1 = perf1["overall_metrics"]["accuracy"]
        acc2 = perf2["overall_metrics"]["accuracy"]
        time1 = perf1["overall_metrics"]["avg_response_time"]
        time2 = perf2["overall_metrics"]["avg_response_time"]

        if acc2 > acc1 and time2 <= time1:
            return f"推荐使用版本2,准确率提升{(acc2 - acc1) * 100:.1f}%,响应时间{'提升' if time2 < time1 else '相当'}"
        elif acc2 > acc1 and time2 > time1:
            return f"版本2准确率更高但响应时间较长,需要权衡"
        elif acc2 < acc1 and time2 < time1:
            return f"版本2响应更快但准确率较低,需要权衡"
        else:
            return f"推荐使用版本1,性能更优"

    def generate_regression_test(self, version_name, test_queries):
        """生成回归测试"""
        if version_name not in self.versions:
            return {"error": "版本不存在"}

        regression_results = {
            "version_name": version_name,
            "test_date": datetime.now().isoformat(),
            "test_results": [],
            "pass_rate": 0
        }

        passed_tests = 0
        total_tests = len(test_queries)

        for query_info in test_queries:
            query = query_info['query']
            expected_answer = query_info.get('expected_answer', '')

            # 执行测试
            retrieved_chunks = self.retrieve_relevant_chunks(query, version_name)
            is_passed = self.evaluate_retrieval_quality(query, retrieved_chunks, expected_answer)

            if is_passed:
                passed_tests += 1

            regression_results["test_results"].append({
                "query": query,
                "expected": expected_answer,
                "retrieved": len(retrieved_chunks),
                "passed": is_passed
            })

        regression_results["pass_rate"] = passed_tests / total_tests if total_tests > 0 else 0

        return regression_results


def main():
    # 初始化版本管理器
    version_manager = KnowledgeBaseVersionManager()

    print("=== 知识库版本管理与性能比较示例(迪士尼主题乐园) ===\n")

    # 创建版本1(基础版本)
    knowledge_base_v1 = [
        {
            "id": "kb_001",
            "content": "上海迪士尼乐园位于上海市浦东新区,是中国大陆首座迪士尼主题乐园,于2016年6月16日开园。"
        },
        {
            "id": "kb_002",
            "content": "上海迪士尼乐园的门票价格:平日成人票价为399元,周末和节假日为499元。"
        },
        {
            "id": "kb_003",
            "content": "上海迪士尼乐园营业时间为上午8:00至晚上8:00。"
        }
    ]

    # 创建版本2(增强版本)
    knowledge_base_v2 = [
        {
            "id": "kb_001",
            "content": "上海迪士尼乐园位于上海市浦东新区,是中国大陆首座迪士尼主题乐园,于2016年6月16日开园。乐园占地面积390公顷,包含七大主题园区。"
        },
        {
            "id": "kb_002",
            "content": "上海迪士尼乐园的门票价格:平日成人票价为399元,周末和节假日为499元。儿童票(1.0-1.4米)平日为299元,周末为374元。1.0米以下儿童免费。"
        },
        {
            "id": "kb_003",
            "content": "上海迪士尼乐园营业时间为上午8:00至晚上8:00,全年无休。建议出发前查看官方网站确认具体时间。"
        },
        {
            "id": "kb_004",
            "content": "从上海市区到迪士尼乐园可以乘坐地铁11号线到迪士尼站,或乘坐迪士尼专线巴士。"
        },
        {
            "id": "kb_005",
            "content": "上海迪士尼乐园的特色项目包括:创极速光轮、七个小矮人矿山车、加勒比海盗等。"
        }
    ]

    # 功能1: 创建版本
    print("功能1: 创建知识库版本")
    v1_info = version_manager.create_version(knowledge_base_v1, "v1.0", "基础版本")
    v2_info = version_manager.create_version(knowledge_base_v2, "v2.0", "增强版本")

    print(f"版本1信息:")
    print(f"  版本名: {v1_info['version_name']}")
    print(f"  描述: {v1_info['description']}")
    print(f"  知识切片数量: {v1_info['statistics']['total_chunks']}")
    print(f"  平均切片长度: {v1_info['statistics']['average_chunk_length']:.0f}字符")

    print(f"\n版本2信息:")
    print(f"  版本名: {v2_info['version_name']}")
    print(f"  描述: {v2_info['description']}")
    print(f"  知识切片数量: {v2_info['statistics']['total_chunks']}")
    print(f"  平均切片长度: {v2_info['statistics']['average_chunk_length']:.0f}字符")

    print("\n" + "=" * 60 + "\n")

    # 功能示例2: 版本比较
    print("功能2: 版本差异比较")
    comparison = version_manager.compare_versions("v1.0", "v2.0")

    print(f"版本比较结果:")
    changes = comparison['changes']
    print(f"  新增知识切片: {len(changes['added_chunks'])}个")
    print(f"  删除知识切片: {len(changes['removed_chunks'])}个")
    print(f"  修改知识切片: {len(changes['modified_chunks'])}个")

    print(f"\n新增的知识切片:")
    for i, chunk in enumerate(changes['added_chunks'], 1):
        print(f"  {i}. ID: {chunk['id']}")
        print(f"     内容: {chunk['content']}")

    print(f"\n修改的知识切片:")
    for i, chunk in enumerate(changes['modified_chunks'], 1):
        print(f"  {i}. ID: {chunk['id']}")
        print(f"     旧内容: {chunk['old_content']}")
        print(f"     新内容: {chunk['new_content']}")

    print("\n" + "=" * 60 + "\n")

    # 功能3: 性能评估
    print("功能3: 版本性能评估")

    test_queries = [
        {"query": "上海迪士尼乐园在哪里?", "expected_answer": "浦东新区"},  # 关键词包含即正确
        {"query": "门票多少钱?", "expected_answer": "价格"},
        {"query": "营业时间是什么?", "expected_answer": "8:00"},
        {"query": "怎么去迪士尼?", "expected_answer": "地铁"},
        {"query": "有什么好玩的项目?", "expected_answer": "项目"}
    ]

    perf_v1 = version_manager.evaluate_version_performance("v1.0", test_queries)
    perf_v2 = version_manager.evaluate_version_performance("v2.0", test_queries)

    print(f"版本1性能:")
    print(f"  准确率: {perf_v1['overall_metrics']['accuracy'] * 100:.1f}%")
    print(f"  平均响应时间: {perf_v1['overall_metrics']['avg_response_time'] * 1000:.1f}ms")

    print(f"\n版本2性能:")
    print(f"  准确率: {perf_v2['overall_metrics']['accuracy'] * 100:.1f}%")
    print(f"  平均响应时间: {perf_v2['overall_metrics']['avg_response_time'] * 1000:.1f}ms")

    print("\n" + "=" * 60 + "\n")

    # 功能4: 性能比较
    print("功能4: 性能比较与建议")
    perf_comparison = version_manager.compare_version_performance("v1.0", "v2.0", test_queries)

    print(f"性能比较结果:")
    comp = perf_comparison['performance_comparison']
    print(f"  准确率提升: {comp['accuracy']['improvement'] * 100:.1f}%")
    print(f"  响应时间变化: {comp['response_time']['improvement'] * 1000:.1f}ms")
    print(f"  建议: {perf_comparison['recommendation']}")

    print("\n" + "=" * 60 + "\n")

    # 功能5: 回归测试
    print("功能5: 回归测试")
    regression_v2 = version_manager.generate_regression_test("v2.0", test_queries)

    print(f"回归测试结果:")
    print(f"  测试通过率: {regression_v2['pass_rate'] * 100:.1f}%")
    print(f"  测试用例数量: {len(regression_v2['test_results'])}")

    print(f"\n详细测试结果:")
    for i, result in enumerate(regression_v2['test_results'], 1):
        status = "✓" if result['passed'] else "✗"
        print(f"  {i}. {result['query']} {status}")


if __name__ == "__main__":
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545

# 总结

  • 知识库问题生成与检索优化
    为知识库内容生成多样化问题,使用BM25算法进行文本检索优化
    LLM作用:生成多样化问题,评估检索质量,优化知识库结构

  • 对话知识沉淀
    从用户对话中提取有价值的知识点,进行智能合并和分类
    LLM作用:提取对话中的知识点,智能合并相似知识,生成结构化知识内容

  • 知识库健康度检查
    检查知识库的完整性、时效性和一致性,识别知识空白和冲突
    LLM作用:分析知识缺失、检查过时内容、识别知识冲突,生成健康度报告

  • 知识库版本管理与性能比较
    Embedding作用:构建向量索引,进行语义检索,支持版本间的性能评估
    版本差异检测使用精确文本匹配(简化版,没有使用到LLM)