#实战项目三:语义搜索与问答系统
#目录
#项目概述
语义搜索与问答系统是现代NLP应用的核心组件,通过向量检索技术实现语义层面的精确匹配。本项目将构建一个完整的FAQ智能问答系统,支持自然语言提问并返回精准答案。
#项目目标
def project_goals():
"""
语义搜索与问答系统项目目标
"""
goals = {
"核心技术": [
"语义向量化:将文本编码为高维向量",
"向量检索:基于语义相似度搜索",
"RAG架构:检索增强生成",
"向量数据库:高效存储与检索"
],
"性能指标": [
"检索准确率 > 90%",
"响应时间 < 500ms",
"支持并发 > 100 QPS",
"支持多种文档格式"
],
"业务目标": [
"提升客服效率",
"降低人工成本",
"提高用户满意度",
"支持智能知识管理"
]
}
print("项目目标:")
for category, items in goals.items():
print(f"\n{category}:")
for item in items:
print(f" ✓ {item}")
project_goals()#核心技术栈
def technology_stack():
"""
项目技术栈
"""
stack = {
"向量化模型": ["sentence-transformers", "BERT", "RoBERTa"],
"向量数据库": ["FAISS", "Milvus", "Chroma", "Weaviate"],
"后端框架": ["FastAPI", "uvicorn", "redis"],
"前端框架": ["Streamlit", "React", "Vue"],
"LLM集成": ["OpenAI", "Hugging Face", "本地模型"],
"部署方案": ["Docker", "Kubernetes", "云服务"]
}
print("核心技术栈:")
for category, components in stack.items():
print(f" {category}: {', '.join(components)}")
technology_stack()#系统架构设计
#整体架构
def system_architecture():
"""
系统架构设计
"""
print("语义搜索与问答系统架构:")
architecture_flow = """
┌─────────────────────────────────────────────────────────────────┐
│ 用户界面层 │
│ (Web UI / Mobile App) │
└─────────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────────┐
│ API网关层 │
│ (FastAPI / Nginx) │
└─────────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────────┐
│ 问答处理层 │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 查询处理模块 │ │ RAG处理模块 │ │
│ │ - 预处理 │ │ - 检索 │ │
│ │ - 清洗 │ │ - 生成 │ │
│ │ - 路由 │ │ - 重排 │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────────┐
│ 存储层 │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 向量数据库 │ │ 知识库 │ │
│ │ (FAISS/Milvus) │ │ (FAQ/文档) │ │
│ │ - 向量存储 │ │ - 结构化数据 │ │
│ │ - 相似度检索 │ │ - 非结构化数据 │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────────┐
│ AI模型层 │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 嵌入模型 │ │ 语言模型 │ │
│ │ (BERT/Sentence │ │ (GPT/LLaMA/ │ │
│ │ -Transformer) │ │ 本地模型) │ │
│ │ - 文本向量化 │ │ - 生成回答 │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
"""
print(architecture_flow)
system_architecture()#架构优势
def architecture_advantages():
"""
架构优势分析
"""
advantages = [
"模块化设计:各组件职责清晰,便于维护",
"可扩展性:支持水平扩展应对高并发",
"灵活性:支持多种模型和数据库",
"高性能:向量检索毫秒级响应",
"准确性:语义理解超越关键词匹配",
"可解释性:检索结果可追溯"
]
print("架构优势:")
for advantage in advantages:
print(f" ✓ {advantage}")
architecture_advantages()#数据准备与预处理
#FAQ数据结构
def faq_data_structure():
"""
FAQ数据结构设计
"""
print("FAQ数据结构设计:")
faq_example = {
"id": "faq_001",
"question": "密码忘记了怎么办?",
"answer": "请访问登录页面,点击'忘记密码',输入注册邮箱,系统会发送重置链接到您的邮箱。或者联系客服人工处理。",
"category": "账户",
"tags": ["密码", "重置", "账户"],
"created_at": "2024-01-01",
"updated_at": "2024-01-01",
"status": "active",
"similarity_questions": [
"密码忘记了怎么找回?",
"忘记密码了怎么办?",
"密码丢失如何重置?"
]
}
print("FAQ数据结构示例:")
import json
print(json.dumps(faq_example, ensure_ascii=False, indent=2))
# 数据预处理代码
data_preprocessing_code = """
import pandas as pd
import re
import json
from datetime import datetime
class FAQPreprocessor:
def __init__(self):
self.stop_words = set(['的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这'])
def clean_text(self, text):
"""
文本清洗
"""
# 去除特殊字符
text = re.sub(r'[\\\\/:*?"<>|]', ' ', text)
# 去除多余空白
text = re.sub(r'\\s+', ' ', text).strip()
# 去除HTML标签
text = re.sub(r'<[^>]+>', '', text)
return text
def normalize_text(self, text):
"""
文本标准化
"""
# 统一标点符号
text = text.replace('?', '?').replace('!', '!').replace(',', ',')
# 去除停用词(可选)
words = [w for w in text.split() if w not in self.stop_words]
return ' '.join(words)
def prepare_faq_data(self, raw_data):
"""
准备FAQ数据
"""
processed_data = []
for item in raw_data:
processed_item = {
'id': item.get('id', f"faq_{len(processed_data)+1}"),
'question': self.clean_text(item['question']),
'answer': self.clean_text(item['answer']),
'category': item.get('category', 'general'),
'tags': item.get('tags', []),
'created_at': item.get('created_at', datetime.now().isoformat()),
'updated_at': item.get('updated_at', datetime.now().isoformat()),
'status': item.get('status', 'active'),
'similarity_questions': [
self.clean_text(q) for q in item.get('similarity_questions', [])
]
}
# 构建完整搜索文本
search_text = f"{processed_item['question']} {' '.join(processed_item['similarity_questions'])}"
processed_item['search_text'] = search_text
processed_data.append(processed_item)
return processed_data
# 示例数据准备
raw_faq_data = [
{
"question": "密码忘记了怎么办?",
"answer": "请访问登录页面,点击'忘记密码',输入注册邮箱,系统会发送重置链接到您的邮箱。或者联系客服人工处理。",
"category": "账户",
"tags": ["密码", "重置", "账户"]
},
{
"question": "如何修改个人信息?",
"answer": "登录后点击右上角头像 → '个人中心' → '编辑资料',即可修改昵称、头像、简介等信息。",
"category": "账户",
"tags": ["个人信息", "修改", "资料"]
},
# ... 更多FAQ条目
]
preprocessor = FAQPreprocessor()
processed_faq_data = preprocessor.prepare_faq_data(raw_faq_data)
"""
print("\n数据预处理代码:")
print(data_preprocessing_code)
faq_data_structure()#数据质量检查
def data_quality_check():
"""
数据质量检查
"""
print("数据质量检查:")
quality_checks = [
"完整性检查:确保必填字段不为空",
"一致性检查:统一格式和标准",
"准确性检查:验证答案正确性",
"重复性检查:识别重复问答对",
"相关性检查:确保问题答案匹配",
"长度检查:过滤过长或过短内容"
]
print("质量检查项目:")
for check in quality_checks:
print(f" ✓ {check}")
# 数据质量检查代码
quality_check_code = """
def validate_faq_data(faq_data):
"""
验证FAQ数据质量
"""
issues = []
for i, item in enumerate(faq_data):
# 检查必填字段
if not item.get('question') or not item.get('answer'):
issues.append(f"第{i+1}条:缺少问题或答案")
# 检查长度
if len(item.get('question', '')) < 5:
issues.append(f"第{i+1}条:问题过短")
if len(item.get('answer', '')) < 10:
issues.append(f"第{i+1}条:答案过短")
# 检查重复
for j, other_item in enumerate(faq_data[i+1:], i+1):
if item['question'] == other_item['question']:
issues.append(f"第{i+1}条和第{j+1}条:问题重复")
return issues
# 使用示例
validation_issues = validate_faq_data(processed_faq_data)
if validation_issues:
print("发现以下问题:")
for issue in validation_issues:
print(f" - {issue}")
else:
print("数据质量检查通过")
"""
print("\n数据质量检查代码:")
print(quality_check_code)
data_quality_check()#向量化与索引构建
#嵌入模型选择
def embedding_model_selection():
"""
嵌入模型选择与配置
"""
print("嵌入模型选择:")
models_comparison = [
{
"模型": "all-MiniLM-L6-v2",
"维度": 384,
"大小": "~80MB",
"速度": "快",
"中文支持": "一般",
"适用场景": "轻量级应用"
},
{
"模型": "paraphrase-multilingual-MiniLM-L12-v2",
"维度": 768,
"大小": "~200MB",
"速度": "中等",
"中文支持": "好",
"适用场景": "多语言应用"
},
{
"模型": "bge-large-zh-v1.5",
"维度": 1024,
"大小": "~1.3GB",
"速度": "较慢",
"中文支持": "优秀",
"适用场景": "中文专用"
},
{
"模型": "text-embedding-ada-002",
"维度": 1536,
"大小": "云端",
"速度": "快",
"中文支持": "好",
"适用场景": "商业应用"
}
]
print(f"{'模型':<35} {'维度':<8} {'大小':<10} {'速度':<8} {'中文支持':<10} {'适用场景':<15}")
print("-" * 95)
for model in models_comparison:
print(f"{model['模型']:<35} {model['维度']:<8} {model['大小']:<10} {model['速度']:<8} {model['中文支持']:<10} {model['适用场景']:<15}")
# 嵌入模型实现代码
embedding_implementation = """
from sentence_transformers import SentenceTransformer
import numpy as np
import torch
class FAQEmbedder:
def __init__(self, model_name='paraphrase-multilingual-MiniLM-L12-v2'):
self.model_name = model_name
self.model = SentenceTransformer(model_name)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(self.device)
def encode_texts(self, texts, batch_size=32):
"""
批量编码文本
"""
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
batch_embeddings = self.model.encode(
batch,
convert_to_tensor=True,
show_progress_bar=True
)
embeddings.append(batch_embeddings.cpu().numpy())
return np.vstack(embeddings)
def encode_single_text(self, text):
"""
编码单个文本
"""
embedding = self.model.encode([text], convert_to_tensor=True)
return embedding.cpu().numpy().flatten()
# 使用示例
embedder = FAQEmbedder('paraphrase-multilingual-MiniLM-L12-v2')
# 编码FAQ问题
questions = [item['question'] for item in processed_faq_data]
question_embeddings = embedder.encode_texts(questions)
print(f"编码了{len(questions)}个问题,向量维度:{question_embeddings.shape[1]}")
"""
print("\n嵌入模型实现代码:")
print(embedding_implementation)
embedding_model_selection()#向量索引构建
def vector_index_building():
"""
向量索引构建
"""
print("向量索引构建:")
indexing_methods = [
{
"方法": "FAISS IndexFlatIP",
"特点": "精确搜索,适合小数据集",
"性能": "查询快,内存占用大",
"适用": "小规模FAQ系统"
},
{
"方法": "FAISS IndexIVFFlat",
"特点": "近似搜索,适合中等数据集",
"性能": "平衡速度和精度",
"适用": "中等规模系统"
},
{
"方法": "FAISS IndexIVFPQ",
"特点": "量化索引,节省内存",
"性能": "大幅节省内存",
"适用": "大规模系统"
},
{
"方法": "Milvus",
"特点": "企业级向量数据库",
"性能": "高并发,分布式",
"适用": "生产环境"
}
]
for method in indexing_methods:
print(f"\n{method['方法']}:")
print(f" 特点: {method['特点']}")
print(f" 性能: {method['性能']}")
print(f" 适用: {method['适用']}")
# 索引构建代码
index_building_code = """
import faiss
import numpy as np
import pickle
import os
class VectorIndexBuilder:
def __init__(self, dimension=768):
self.dimension = dimension
def build_faiss_index(self, embeddings, index_type='flat'):
"""
构建FAISS索引
"""
embeddings = embeddings.astype('float32')
if index_type == 'flat':
# 精确搜索索引
index = faiss.IndexFlatIP(self.dimension)
elif index_type == 'ivf':
# IVF索引(需要训练)
nlist = min(100, len(embeddings) // 10) # 聚类中心数
quantizer = faiss.IndexFlatIP(self.dimension)
index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
# 训练索引
if len(embeddings) > nlist:
index.train(embeddings)
elif index_type == 'pq':
# 乘积量化索引
nlist = min(100, len(embeddings) // 10)
m = self.dimension // 8 # 子空间数
index = faiss.IndexIVFPQ(
faiss.IndexFlatIP(self.dimension),
self.dimension,
nlist,
m,
8 # 每个子向量的比特数
)
if len(embeddings) > nlist:
index.train(embeddings)
# 添加向量
index.add(embeddings)
return index
def normalize_embeddings(self, embeddings):
"""
L2归一化(用于余弦相似度)
"""
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings / (norms + 1e-8)
def save_index(self, index, filepath):
"""
保存索引
"""
faiss.write_index(index, filepath)
def load_index(self, filepath):
"""
加载索引
"""
return faiss.read_index(filepath)
# 构建索引
indexer = VectorIndexBuilder(dimension=question_embeddings.shape[1])
# 归一化向量(用于余弦相似度)
normalized_embeddings = indexer.normalize_embeddings(question_embeddings)
# 构建索引
faq_index = indexer.build_faiss_index(normalized_embeddings, index_type='flat')
# 保存索引和FAQ数据
indexer.save_index(faq_index, 'faq_vectors.index')
with open('faq_data.pkl', 'wb') as f:
pickle.dump(processed_faq_data, f)
print(f"向量索引构建完成,包含{faq_index.ntotal}个向量")
"""
print("\n索引构建代码:")
print(index_building_code)
vector_index_building()#语义检索实现
#检索算法实现
def semantic_retrieval_implementation():
"""
语义检索算法实现
"""
print("语义检索算法实现:")
retrieval_algorithms = [
"余弦相似度:最常用的语义相似度计算",
"欧氏距离:适用于归一化向量",
"内积相似度:FAISS默认的相似度计算",
"Jaccard相似度:适用于稀疏向量"
]
print("检索算法:")
for algo in retrieval_algorithms:
print(f" ✓ {algo}")
# 语义检索实现代码
retrieval_implementation = """
import faiss
import numpy as np
import pickle
from typing import List, Dict, Tuple
import time
class SemanticRetriever:
def __init__(self, index_path: str, data_path: str, embedder: FAQEmbedder):
self.index = faiss.read_index(index_path)
with open(data_path, 'rb') as f:
self.faq_data = pickle.load(f)
self.embedder = embedder
def search(self, query: str, top_k: int = 5, threshold: float = 0.3) -> List[Dict]:
"""
语义检索
"""
start_time = time.time()
# 编码查询
query_embedding = self.embedder.encode_single_text(query)
query_embedding = query_embedding.astype('float32')
query_embedding = query_embedding / (np.linalg.norm(query_embedding) + 1e-8) # 归一化
# 检索
scores, indices = self.index.search(query_embedding.reshape(1, -1), top_k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx >= 0 and score >= threshold: # 有效结果且满足阈值
faq_item = self.faq_data[idx]
results.append({
'id': faq_item['id'],
'question': faq_item['question'],
'answer': faq_item['answer'],
'category': faq_item['category'],
'score': float(score),
'index': int(idx)
})
search_time = time.time() - start_time
return {
'results': results,
'search_time': search_time,
'total_found': len(results)
}
def batch_search(self, queries: List[str], top_k: int = 5) -> List[List[Dict]]:
"""
批量检索
"""
query_embeddings = self.embedder.encode_texts(queries)
query_embeddings = query_embeddings.astype('float32')
# 归一化
norms = np.linalg.norm(query_embeddings, axis=1, keepdims=True)
query_embeddings = query_embeddings / (norms + 1e-8)
scores, indices = self.index.search(query_embeddings, top_k)
all_results = []
for query_idx in range(len(queries)):
results = []
for score, idx in zip(scores[query_idx], indices[query_idx]):
if idx >= 0:
faq_item = self.faq_data[idx]
results.append({
'question': faq_item['question'],
'answer': faq_item['answer'],
'score': float(score)
})
all_results.append(results)
return all_results
# 使用示例
retriever = SemanticRetriever('faq_vectors.index', 'faq_data.pkl', embedder)
# 单次检索
search_result = retriever.search("密码忘记了怎么办?", top_k=3)
print("检索结果:")
for result in search_result['results']:
print(f" 问题: {result['question']}")
print(f" 相似度: {result['score']:.4f}")
print(f" 答案: {result['answer'][:50]}...")
print()
"""
print("语义检索实现代码:")
print(retrieval_implementation)
semantic_retrieval_implementation()#检索优化策略
def retrieval_optimization():
"""
检索优化策略
"""
print("检索优化策略:")
optimization_strategies = [
{
"策略": "多字段检索",
"描述": "同时检索问题、答案、标签等多个字段",
"效果": "提高检索覆盖率"
},
{
"策略": "查询扩展",
"描述": "使用同义词、相关词扩展查询",
"效果": "提高召回率"
},
{
"策略": "重排序",
"描述": "对初步检索结果进行重排序",
"效果": "提高精确率"
},
{
"策略": "缓存机制",
"描述": "缓存热门查询结果",
"效果": "提升响应速度"
},
{
"策略": "分层检索",
"描述": "先粗筛再精排",
"效果": "平衡效率和准确性"
}
]
for strategy in optimization_strategies:
print(f"\n{strategy['策略']}:")
print(f" 描述: {strategy['描述']}")
print(f" 效果: {strategy['效果']}")
# 优化检索代码
optimized_retrieval_code = """
from collections import defaultdict
import heapq
class OptimizedRetriever(SemanticRetriever):
def __init__(self, index_path: str, data_path: str, embedder: FAQEmbedder):
super().__init__(index_path, data_path, embedder)
self.query_cache = {}
self.cache_size = 1000
def hybrid_search(self, query: str, top_k: int = 5) -> List[Dict]:
"""
混合检索:语义+关键词
"""
# 语义检索
semantic_results = self.search(query, top_k * 2)['results']
# 关键词匹配(简单实现)
keyword_results = self.keyword_match(query, top_k * 2)
# 融合结果
combined_scores = defaultdict(float)
# 语义分数
for i, result in enumerate(semantic_results):
combined_scores[result['id']] += result['score'] * 0.7 # 语义权重
# 关键词分数
for i, result in enumerate(keyword_results):
combined_scores[result['id']] += result['keyword_score'] * 0.3 # 关键词权重
# 排序并返回top_k
sorted_results = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
final_results = []
for faq_id, score in sorted_results:
faq_item = next(item for item in self.faq_data if item['id'] == faq_id)
final_results.append({
'question': faq_item['question'],
'answer': faq_item['answer'],
'category': faq_item['category'],
'score': score,
'id': faq_id
})
return final_results
def keyword_match(self, query: str, top_k: int) -> List[Dict]:
"""
简单关键词匹配
"""
import re
from difflib import SequenceMatcher
query_words = set(re.findall(r'\\w+', query.lower()))
results = []
for faq_item in self.faq_data:
text = f"{faq_item['question']} {faq_item['answer']}".lower()
text_words = set(re.findall(r'\\w+', text))
# 计算关键词匹配度
common_words = query_words.intersection(text_words)
keyword_score = len(common_words) / max(len(query_words), 1)
# 计算文本相似度
similarity = SequenceMatcher(None, query.lower(), text).ratio()
total_score = keyword_score * 0.6 + similarity * 0.4
results.append({
'id': faq_item['id'],
'question': faq_item['question'],
'answer': faq_item['answer'],
'keyword_score': total_score
})
# 按分数排序
results.sort(key=lambda x: x['keyword_score'], reverse=True)
return results[:top_k]
# 使用优化检索
optimized_retriever = OptimizedRetriever('faq_vectors.index', 'faq_data.pkl', embedder)
hybrid_results = optimized_retriever.hybrid_search("密码忘记了怎么办?", top_k=3)
"""
print("\n优化检索代码:")
print(optimized_retrieval_code)
retrieval_optimization()#RAG问答系统
#RAG架构实现
def rag_architecture_implementation():
"""
RAG架构实现
"""
print("RAG(检索增强生成)架构实现:")
rag_process = """
RAG系统工作流程:
1. 查询理解
- 解析用户问题意图
- 识别关键实体和概念
- 可能的查询改写
2. 信息检索
- 使用查询检索相关文档
- 获取top-k最相关的结果
- 对结果进行相关性评分
3. 上下文构建
- 将检索结果整合为上下文
- 过滤不相关信息
- 格式化为模型输入
4. 答案生成
- 使用LLM基于上下文生成答案
- 保持事实准确性
- 生成可解释的回答
5. 后处理
- 答案验证和优化
- 引用来源标注
- 质量评估
"""
print(rag_process)
# RAG系统实现代码
rag_implementation = """
from openai import OpenAI
import re
from typing import List, Dict, Optional
class RAGSystem:
def __init__(self, retriever: SemanticRetriever, llm_client=None):
self.retriever = retriever
self.llm_client = llm_client # 可选的LLM客户端
self.max_context_length = 3000 # 最大上下文长度
def build_context(self, query: str, retrieved_docs: List[Dict]) -> str:
"""
构建上下文
"""
context_parts = []
for i, doc in enumerate(retrieved_docs, 1):
doc_context = f"""
文档 {i}:
问题: {doc['question']}
答案: {doc['answer']}
相似度: {doc['score']:.4f}
""".strip()
context_parts.append(doc_context)
full_context = "\\n\\n".join(context_parts)
# 截断过长的上下文
if len(full_context) > self.max_context_length:
full_context = full_context[:self.max_context_length]
# 确保不截断文档边界
last_doc_end = full_context.rfind('\\n\\n')
if last_doc_end > 0:
full_context = full_context[:last_doc_end]
return full_context
def generate_answer(self, query: str, context: str) -> Dict:
"""
生成答案
"""
if self.llm_client:
# 使用外部LLM
prompt = f'''
你是一个智能客服助手。请根据以下知识库信息回答用户问题。
知识库信息:
{context}
用户问题:{query}
请遵循以下要求:
1. 如果知识库中有相关信息,请基于这些信息给出准确回答
2. 如果没有相关信息,请礼貌说明无法找到答案
3. 回答应简洁明了,控制在200字以内
4. 如有必要,可说明信息来源
5. 保持客观中立的语气
'''
try:
response = self.llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
temperature=0.3
)
answer = response.choices[0].message.content
return {
'answer': answer,
'status': 'success',
'generated_by': 'llm'
}
except Exception as e:
return {
'answer': '抱歉,答案生成遇到问题,请稍后重试。',
'status': 'error',
'error': str(e),
'generated_by': 'fallback'
}
else:
# 使用规则-based方法作为备选
return self.rule_based_answer(query, context)
def rule_based_answer(self, query: str, context: str) -> Dict:
"""
基于规则的答案生成(备选方案)
"""
if not context.strip():
return {
'answer': '抱歉,没有找到相关信息。您可以尝试重新表述问题或联系客服。',
'status': 'not_found',
'generated_by': 'rule_based'
}
# 简单的规则:返回最相关的文档答案
lines = context.split('\\n\\n')
if lines:
# 提取第一个文档的答案
for line in lines:
if '答案:' in line:
answer = line.split('答案:')[1].strip()
return {
'answer': f'根据知识库信息:{answer}',
'status': 'success',
'generated_by': 'rule_based'
}
return {
'answer': '找到了相关信息,但无法生成确切答案。请提供更多细节或联系客服。',
'status': 'partial',
'generated_by': 'rule_based'
}
def query(self, user_query: str, top_k: int = 3, threshold: float = 0.3) -> Dict:
"""
完整的RAG查询流程
"""
import time
start_time = time.time()
# 1. 语义检索
retrieval_result = self.retriever.search(user_query, top_k, threshold)
# 2. 构建上下文
context = self.build_context(user_query, retrieval_result['results'])
# 3. 生成答案
generation_result = self.generate_answer(user_query, context)
total_time = time.time() - start_time
return {
'query': user_query,
'answer': generation_result['answer'],
'status': generation_result['status'],
'retrieved_docs': retrieval_result['results'],
'context_used': context,
'response_time': total_time,
'retrieval_time': retrieval_result['search_time'],
'generated_by': generation_result.get('generated_by', 'unknown')
}
# 使用RAG系统
rag_system = RAGSystem(optimized_retriever)
# 示例查询
result = rag_system.query("密码忘记了怎么办?", top_k=3)
print(f"答案: {result['answer']}")
print(f"检索到 {len(result['retrieved_docs'])} 个相关文档")
print(f"响应时间: {result['response_time']:.4f}秒")
"""
print("RAG系统实现代码:")
print(rag_implementation)
rag_architecture_implementation()#RAG优化策略
def rag_optimization_strategies():
"""
RAG系统优化策略
"""
print("RAG系统优化策略:")
optimization_strategies = [
{
"策略": "查询改写",
"描述": "优化用户查询以提高检索效果",
"技术": "使用LLM重写查询"
},
{
"策略": "上下文压缩",
"描述": "压缩长上下文以适应模型限制",
"技术": "抽取关键句子或摘要"
},
{
"策略": "多跳检索",
"描述": "进行多轮检索获取更多信息",
"技术": "基于第一轮结果生成新查询"
},
{
"策略": "自我一致性",
"描述": "生成多个答案并选择最一致的",
"技术": "多次生成取多数票"
},
{
"策略": "置信度评估",
"描述": "评估答案的可信度",
"技术": "基于检索分数和生成质量"
}
]
for strategy in optimization_strategies:
print(f"\n{strategy['策略']}:")
print(f" 描述: {strategy['描述']}")
print(f" 技术: {strategy['技术']}")
# RAG优化代码
rag_optimization_code = """
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AdvancedRAGSystem(RAGSystem):
def __init__(self, retriever: SemanticRetriever, llm_client=None):
super().__init__(retriever, llm_client)
self.executor = ThreadPoolExecutor(max_workers=4)
async def query_rewrite(self, original_query: str) -> str:
"""
查询改写
"""
if self.llm_client:
rewrite_prompt = f"""
请将以下用户查询改写为更清晰、更具体的搜索查询,保持原意不变:
原查询:{original_query}
改写后的查询:
"""
try:
response = self.llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": rewrite_prompt}],
max_tokens=100
)
return response.choices[0].message.content.strip()
except:
return original_query
return original_query
def compress_context(self, context: str, max_length: int = 2000) -> str:
"""
上下文压缩
"""
if len(context) <= max_length:
return context
# 简单的压缩:保留最重要的部分
sentences = context.split('. ')
compressed_parts = []
current_length = 0
for sentence in sentences:
if current_length + len(sentence) <= max_length:
compressed_parts.append(sentence)
current_length += len(sentence)
else:
break
return '. '.join(compressed_parts) + '...'
def multi_hop_retrieve(self, query: str, hops: int = 2) -> List[Dict]:
"""
多跳检索
"""
all_results = []
current_query = query
for hop in range(hops):
hop_results = self.retriever.search(current_query, top_k=2)['results']
all_results.extend(hop_results)
if hop < hops - 1 and hop_results:
# 基于结果生成新查询
context = self.build_context(current_query, hop_results[:1])
follow_up_prompt = f"""
基于以下信息,生成一个更具体的后续查询来获取更多信息:
上下文:{context}
原查询:{current_query}
后续查询:
"""
if self.llm_client:
try:
response = self.llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": follow_up_prompt}],
max_tokens=100
)
current_query = response.choices[0].message.content.strip()
except:
break
# 去重并按相关性排序
seen_ids = set()
unique_results = []
for result in all_results:
if result['id'] not in seen_ids:
unique_results.append(result)
seen_ids.add(result['id'])
return unique_results[:5] # 返回top-5
async def query_with_optimizations(self, user_query: str) -> Dict:
"""
带优化的查询
"""
start_time = time.time()
# 1. 查询改写
rewritten_query = await self.query_rewrite(user_query)
# 2. 多跳检索
retrieved_docs = self.multi_hop_retrieve(rewritten_query)
# 3. 构建并压缩上下文
context = self.build_context(rewritten_query, retrieved_docs)
compressed_context = self.compress_context(context)
# 4. 生成答案
generation_result = self.generate_answer(rewritten_query, compressed_context)
total_time = time.time() - start_time
return {
'original_query': user_query,
'rewritten_query': rewritten_query,
'answer': generation_result['answer'],
'status': generation_result['status'],
'retrieved_docs': retrieved_docs,
'context_used': compressed_context,
'response_time': total_time,
'generated_by': generation_result.get('generated_by', 'unknown')
}
# 使用优化的RAG系统
advanced_rag = AdvancedRAGSystem(optimized_retriever)
# 注意:在实际使用中需要正确初始化asyncio事件循环
"""
print("\nRAG优化代码:")
print(rag_optimization_code)
rag_optimization_strategies()#API服务部署
#FastAPI服务实现
def fastapi_deployment():
"""
FastAPI服务部署实现
"""
print("FastAPI服务部署:")
fastapi_implementation = """
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
import time
import logging
from uuid import uuid4
app = FastAPI(
title="语义搜索与问答系统API",
description="基于RAG架构的智能问答系统",
version="1.0.0"
)
# 请求模型
class QueryRequest(BaseModel):
query: str
top_k: Optional[int] = 3
threshold: Optional[float] = 0.3
use_hybrid: Optional[bool] = False
include_sources: Optional[bool] = True
class FeedbackRequest(BaseModel):
query_id: str
query: str
answer: str
rating: int # 1-5
helpful: bool
comment: Optional[str] = None
# 响应模型
class QueryResponse(BaseModel):
query_id: str
query: str
answer: str
sources: Optional[List[Dict]] = None
response_time: float
retrieval_time: Optional[float] = None
status: str
debug_info: Optional[Dict] = None
# 全局RAG系统实例(在实际部署中应使用依赖注入)
rag_system = None # 初始化时赋值
@app.on_event('startup')
async def startup_event():
global rag_system
# 初始化RAG系统
rag_system = AdvancedRAGSystem(optimized_retriever)
logging.info("RAG系统初始化完成")
@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
"""
问答查询接口
"""
start_time = time.time()
query_id = str(uuid4())
try:
logging.info(f"查询 {query_id}: {request.query}")
if request.use_hybrid:
# 使用混合检索
results = optimized_retriever.hybrid_search(
request.query,
top_k=request.top_k
)
# 构建上下文并生成答案
context = rag_system.build_context(request.query, results)
generation_result = rag_system.generate_answer(request.query, context)
response_data = {
'query_id': query_id,
'query': request.query,
'answer': generation_result['answer'],
'sources': results if request.include_sources else None,
'response_time': time.time() - start_time,
'status': generation_result['status'],
'debug_info': {
'method': 'hybrid',
'generated_by': generation_result.get('generated_by', 'unknown')
}
}
else:
# 使用标准RAG流程
result = rag_system.query(
request.query,
top_k=request.top_k,
threshold=request.threshold
)
response_data = {
'query_id': query_id,
'query': request.query,
'answer': result['answer'],
'sources': result['retrieved_docs'] if request.include_sources else None,
'response_time': result['response_time'],
'retrieval_time': result.get('retrieval_time'),
'status': result['status'],
'debug_info': {
'method': 'standard_rag',
'generated_by': result['generated_by']
}
}
logging.info(f"查询 {query_id} 完成,耗时: {response_data['response_time']:.4f}s")
return response_data
except Exception as e:
logging.error(f"查询 {query_id} 失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"查询处理失败: {str(e)}")
@app.post("/search")
async def search_endpoint(request: QueryRequest):
"""
仅检索接口
"""
start_time = time.time()
try:
if request.use_hybrid:
results = optimized_retriever.hybrid_search(
request.query,
top_k=request.top_k
)
else:
search_result = optimized_retriever.search(
request.query,
request.top_k,
request.threshold
)
results = search_result['results']
return {
'query': request.query,
'results': results,
'count': len(results),
'search_time': time.time() - start_time
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"检索失败: {str(e)}")
@app.post("/feedback")
async def feedback_endpoint(request: FeedbackRequest):
"""
反馈接口
"""
# 这里可以保存反馈到数据库,用于后续模型优化
feedback_data = {
'query_id': request.query_id,
'rating': request.rating,
'helpful': request.helpful,
'comment': request.comment,
'timestamp': time.time()
}
logging.info(f"收到反馈: {feedback_data}")
# 可以异步处理反馈数据
# await process_feedback_async(feedback_data)
return {"status": "received", "message": "反馈已接收"}
@app.get("/health")
async def health_check():
"""
健康检查接口
"""
return {
"status": "healthy",
"timestamp": time.time(),
"model_loaded": rag_system is not None
}
@app.get("/stats")
async def get_stats():
"""
系统统计信息
"""
# 这里可以返回实际的系统统计信息
return {
"total_queries": 0, # 从数据库或缓存获取
"avg_response_time": 0.0,
"success_rate": 100.0,
"uptime": time.time() # 实际运行时间
}
# 启动命令: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
"""
print("FastAPI服务实现代码:")
print(fastapi_implementation)
fastapi_deployment()#Docker部署配置
def docker_deployment_config():
"""
Docker部署配置
"""
print("Docker部署配置:")
dockerfile_content = """
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update \\
&& apt-get install -y gcc g++ build-essential \\
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建模型缓存目录
RUN mkdir -p /root/.cache/huggingface
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
"""
requirements_content = """
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
sentence-transformers==2.2.2
faiss-cpu==1.7.4
torch==2.1.0
transformers==4.35.0
pandas==2.1.0
numpy==1.24.3
openai==1.3.0
pydantic==2.4.2
python-multipart==0.0.6
redis==5.0.1
"""
docker_compose_content = """
# docker-compose.yml
version: '3.8'
services:
rag-service:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- TRANSFORMERS_CACHE=/root/.cache/huggingface
- HF_HOME=/root/.cache/huggingface
volumes:
- ./models:/app/models
- ./data:/app/data
- ~/.cache/huggingface:/root/.cache/huggingface
deploy:
resources:
limits:
memory: 8G
cpus: '4'
restart: unless-stopped
depends_on:
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
restart: unless-stopped
volumes:
redis_data:
"""
print("Dockerfile:")
print(dockerfile_content)
print("\nrequirements.txt:")
print(requirements_content)
print("\ndocker-compose.yml:")
print(docker_compose_content)
docker_deployment_config()#前端界面开发
#Streamlit前端实现
def streamlit_frontend():
"""
Streamlit前端界面实现
"""
print("Streamlit前端界面:")
streamlit_app_code = """
import streamlit as st
import requests
import json
import time
from datetime import datetime
# 页面配置
st.set_page_config(
page_title="FAQ智能问答系统",
page_icon="🤖",
layout="wide"
)
# 初始化session state
if 'conversation_history' not in st.session_state:
st.session_state.conversation_history = []
# 标题
st.title("🤖 FAQ 智能问答系统")
st.markdown("---")
# 侧边栏配置
with st.sidebar:
st.header("⚙️ 系统设置")
api_base_url = st.text_input(
"API地址",
value="http://localhost:8000",
help="后端API服务地址"
)
top_k = st.slider("返回结果数", 1, 10, 3)
threshold = st.slider("相似度阈值", 0.0, 1.0, 0.3, step=0.05)
use_hybrid = st.checkbox("使用混合检索", value=False)
show_sources = st.checkbox("显示来源", value=True)
st.markdown("---")
st.info("💡 提示:输入您的问题,系统将为您智能解答")
# 主界面
col1, col2 = st.columns([2, 1])
with col1:
# 输入区域
user_input = st.text_area(
"请输入您的问题:",
placeholder="例如:密码忘记了怎么办?如何修改个人信息?",
height=100
)
# 查询按钮
if st.button("🔍 提问", type="primary", use_container_width=True):
if user_input.strip():
with st.spinner("🧠 思考中..."):
try:
# 调用API
api_url = f"{api_base_url}/query"
payload = {
"query": user_input,
"top_k": top_k,
"threshold": threshold,
"use_hybrid": use_hybrid,
"include_sources": show_sources
}
start_time = time.time()
response = requests.post(api_url, json=payload)
end_time = time.time()
if response.status_code == 200:
result = response.json()
# 显示答案
st.success(result['answer'])
# 显示性能信息
st.info(f"⏱️ 响应时间: {result['response_time']:.3f}秒")
# 显示来源
if show_sources and result.get('sources'):
with st.expander("📋 查看参考来源"):
for i, source in enumerate(result['sources'], 1):
with st.container():
st.markdown(f"**来源 {i}:**")
st.markdown(f"**问题:** {source['question']}")
st.markdown(f"**答案:** {source['answer'][:200]}...")
st.markdown(f"*相似度: {source['score']:.3f}*")
st.markdown("---")
# 添加到对话历史
st.session_state.conversation_history.append({
'question': user_input,
'answer': result['answer'],
'timestamp': datetime.now().strftime("%H:%M:%S"),
'sources': result.get('sources', [])
})
else:
st.error(f"❌ API调用失败: {response.text}")
except requests.exceptions.ConnectionError:
st.error("❌ 无法连接到后端服务,请检查API地址是否正确")
except Exception as e:
st.error(f"❌ 发生错误: {str(e)}")
else:
st.warning("⚠️ 请输入问题")
with col2:
st.header("💬 对话历史")
if st.session_state.conversation_history:
# 显示最近的对话
for i, conv in enumerate(reversed(st.session_state.conversation_history[-5:])):
with st.expander(f"⏰ {conv['timestamp']}", expanded=False):
st.markdown(f"**您:** {conv['question']}")
st.markdown(f"**系统:** {conv['answer'][:100]}...")
else:
st.info("暂无对话历史")
# 清空历史
if st.button("🗑️ 清空历史"):
st.session_state.conversation_history = []
st.rerun()
# 反馈功能
st.markdown("---")
st.subheader("⭐ 反馈与评价")
if st.session_state.conversation_history:
latest_conv = st.session_state.conversation_history[-1]
col_rate1, col_rate2 = st.columns(2)
with col_rate1:
rating = st.select_slider(
"满意度评分",
options=[1, 2, 3, 4, 5],
format_func=lambda x: ['1 ⭐', '2 ⭐', '3 ⭐', '4 ⭐', '5 ⭐'][x-1]
)
with col_rate2:
helpful = st.radio("答案是否有帮助?", ("是", "否"), horizontal=True)
feedback_comment = st.text_area("其他意见或建议:", height=60)
if st.button("📤 提交反馈"):
try:
feedback_payload = {
"query_id": latest_conv.get('query_id', ''),
"query": latest_conv['question'],
"answer": latest_conv['answer'],
"rating": rating,
"helpful": helpful == "是",
"comment": feedback_comment
}
feedback_response = requests.post(
f"{api_base_url}/feedback",
json=feedback_payload
)
if feedback_response.status_code == 200:
st.success("✅ 感谢您的反馈!")
else:
st.error("❌ 反馈提交失败")
except:
st.error("❌ 反馈提交失败")
# 页脚
st.markdown("---")
st.caption("Powered by RAG System | Built with Streamlit")
"""
print("Streamlit前端实现代码:")
print(streamlit_app_code)
streamlit_frontend()#性能优化
#系统性能优化策略
def performance_optimization():
"""
系统性能优化策略
"""
print("系统性能优化:")
optimization_areas = [
{
"优化领域": "向量化性能",
"策略": "批量处理、GPU加速、模型量化",
"目标": "提高向量化速度"
},
{
"优化领域": "检索性能",
"策略": "索引优化、缓存机制、近似搜索",
"目标": "毫秒级检索响应"
},
{
"优化领域": "API性能",
"策略": "异步处理、连接池、负载均衡",
"目标": "高并发支持"
},
{
"优化领域": "存储性能",
"策略": "向量数据库优化、数据分区",
"目标": "高效存储访问"
}
]
for area in optimization_areas:
print(f"\n{area['优化领域']}:")
print(f" 策略: {area['策略']}")
print(f" 目标: {area['目标']}")
# 性能优化代码
performance_optimization_code = """
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import redis
import pickle
from functools import lru_cache
import time
class PerformanceOptimizer:
def __init__(self):
# Redis缓存
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
# 线程池
self.executor = ThreadPoolExecutor(max_workers=10)
# 本地缓存
self.local_cache = {}
@lru_cache(maxsize=1000)
def cached_embed_query(self, query: str):
"""
缓存查询向量化结果
"""
return embedder.encode_single_text(query)
def get_cached_result(self, query: str, top_k: int) -> Optional[Dict]:
"""
从缓存获取结果
"""
cache_key = f"search:{hash(query)}:{top_k}"
cached_result = self.redis_client.get(cache_key)
if cached_result:
return pickle.loads(cached_result)
return None
def cache_result(self, query: str, top_k: int, result: Dict, ttl: int = 3600):
"""
缓存搜索结果
"""
cache_key = f"search:{hash(query)}:{top_k}"
self.redis_client.setex(
cache_key,
ttl,
pickle.dumps(result)
)
async def async_batch_search(self, queries: List[str], top_k: int = 3):
"""
异步批量搜索
"""
tasks = []
for query in queries:
task = asyncio.create_task(self.async_single_search(query, top_k))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def async_single_search(self, query: str, top_k: int):
"""
异步单次搜索
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: optimized_retriever.search(query, top_k)
)
# 使用性能优化器
optimizer = PerformanceOptimizer()
"""
print("\n性能优化代码:")
print(performance_optimization_code)
performance_optimization()#实际应用案例
#企业应用案例
def enterprise_case_studies():
"""
企业应用案例分析
"""
print("企业应用案例:")
case_studies = [
{
"行业": "电商平台",
"应用场景": "客服FAQ系统",
"规模": "10万+FAQ条目",
"技术方案": "BGE-large-zh + Milvus + GPT-4",
"效果": "客服效率提升60%,响应时间<200ms"
},
{
"行业": "金融服务",
"应用场景": "理财产品咨询",
"规模": "5万+产品问答",
"技术方案": "混合检索 + 金融领域微调模型",
"效果": "准确率92%,用户满意度4.5/5"
},
{
"行业": "教育机构",
"应用场景": "课程咨询问答",
"规模": "2万+课程FAQ",
"技术方案": "Sentence-BERT + FAISS + 本地LLM",
"效果": "7×24小时服务,成本降低40%"
},
{
"行业": "医疗健康",
"应用场景": "健康咨询助手",
"规模": "8万+医学问答",
"技术方案": "医疗领域专用模型 + 知识图谱",
"效果": "专业性强,安全性
