实战项目三:语义搜索与问答系统

目录

项目概述

语义搜索与问答系统是现代NLP应用的核心组件,通过向量检索技术实现语义层面的精确匹配。本项目将构建一个完整的FAQ智能问答系统,支持自然语言提问并返回精准答案。

项目目标

def project_goals():
    """
    语义搜索与问答系统项目目标
    """
    goals = {
        "核心技术": [
            "语义向量化:将文本编码为高维向量",
            "向量检索:基于语义相似度搜索",
            "RAG架构:检索增强生成",
            "向量数据库:高效存储与检索"
        ],
        "性能指标": [
            "检索准确率 > 90%",
            "响应时间 < 500ms",
            "支持并发 > 100 QPS",
            "支持多种文档格式"
        ],
        "业务目标": [
            "提升客服效率",
            "降低人工成本",
            "提高用户满意度",
            "支持智能知识管理"
        ]
    }
    
    print("项目目标:")
    for category, items in goals.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  ✓ {item}")

project_goals()

核心技术栈

def technology_stack():
    """
    项目技术栈
    """
    stack = {
        "向量化模型": ["sentence-transformers", "BERT", "RoBERTa"],
        "向量数据库": ["FAISS", "Milvus", "Chroma", "Weaviate"],
        "后端框架": ["FastAPI", "uvicorn", "redis"],
        "前端框架": ["Streamlit", "React", "Vue"],
        "LLM集成": ["OpenAI", "Hugging Face", "本地模型"],
        "部署方案": ["Docker", "Kubernetes", "云服务"]
    }
    
    print("核心技术栈:")
    for category, components in stack.items():
        print(f"  {category}: {', '.join(components)}")

technology_stack()

系统架构设计

整体架构

def system_architecture():
    """
    系统架构设计
    """
    print("语义搜索与问答系统架构:")
    
    architecture_flow = """
    ┌─────────────────────────────────────────────────────────────────┐
    │                        用户界面层                                │
    │                    (Web UI / Mobile App)                       │
    └─────────────────────────────────────────────────────────────────┘

    ┌─────────────────────────────────────────────────────────────────┐
    │                        API网关层                                │
    │                    (FastAPI / Nginx)                          │
    └─────────────────────────────────────────────────────────────────┘

    ┌─────────────────────────────────────────────────────────────────┐
    │                      问答处理层                                 │
    │  ┌─────────────────┐    ┌─────────────────┐                   │
    │  │  查询处理模块     │    │  RAG处理模块    │                   │
    │  │  - 预处理       │    │  - 检索         │                   │
    │  │  - 清洗         │    │  - 生成         │                   │
    │  │  - 路由         │    │  - 重排         │                   │
    │  └─────────────────┘    └─────────────────┘                   │
    └─────────────────────────────────────────────────────────────────┘

    ┌─────────────────────────────────────────────────────────────────┐
    │                      存储层                                    │
    │  ┌─────────────────┐    ┌─────────────────┐                   │
    │  │  向量数据库      │    │  知识库         │                   │
    │  │  (FAISS/Milvus) │    │  (FAQ/文档)     │                   │
    │  │  - 向量存储      │    │  - 结构化数据    │                   │
    │  │  - 相似度检索    │    │  - 非结构化数据  │                   │
    │  └─────────────────┘    └─────────────────┘                   │
    └─────────────────────────────────────────────────────────────────┘

    ┌─────────────────────────────────────────────────────────────────┐
    │                      AI模型层                                   │
    │  ┌─────────────────┐    ┌─────────────────┐                   │
    │  │  嵌入模型       │    │  语言模型        │                   │
    │  │  (BERT/Sentence │    │  (GPT/LLaMA/   │                   │
    │  │   -Transformer) │    │   本地模型)     │                   │
    │  │  - 文本向量化   │    │  - 生成回答     │                   │
    │  └─────────────────┘    └─────────────────┘                   │
    └─────────────────────────────────────────────────────────────────┘
    """
    
    print(architecture_flow)

system_architecture()

架构优势

def architecture_advantages():
    """
    架构优势分析
    """
    advantages = [
        "模块化设计:各组件职责清晰,便于维护",
        "可扩展性:支持水平扩展应对高并发",
        "灵活性:支持多种模型和数据库",
        "高性能:向量检索毫秒级响应",
        "准确性:语义理解超越关键词匹配",
        "可解释性:检索结果可追溯"
    ]
    
    print("架构优势:")
    for advantage in advantages:
        print(f"  ✓ {advantage}")

architecture_advantages()

数据准备与预处理

FAQ数据结构

def faq_data_structure():
    """
    FAQ数据结构设计
    """
    print("FAQ数据结构设计:")
    
    faq_example = {
        "id": "faq_001",
        "question": "密码忘记了怎么办?",
        "answer": "请访问登录页面,点击'忘记密码',输入注册邮箱,系统会发送重置链接到您的邮箱。或者联系客服人工处理。",
        "category": "账户",
        "tags": ["密码", "重置", "账户"],
        "created_at": "2024-01-01",
        "updated_at": "2024-01-01",
        "status": "active",
        "similarity_questions": [
            "密码忘记了怎么找回?",
            "忘记密码了怎么办?",
            "密码丢失如何重置?"
        ]
    }
    
    print("FAQ数据结构示例:")
    import json
    print(json.dumps(faq_example, ensure_ascii=False, indent=2))
    
    # 数据预处理代码
    data_preprocessing_code = """
    import pandas as pd
    import re
    import json
    from datetime import datetime
    
    class FAQPreprocessor:
        def __init__(self):
            self.stop_words = set(['的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这'])
        
        def clean_text(self, text):
            """
            文本清洗
            """
            # 去除特殊字符
            text = re.sub(r'[\\\\/:*?"<>|]', ' ', text)
            # 去除多余空白
            text = re.sub(r'\\s+', ' ', text).strip()
            # 去除HTML标签
            text = re.sub(r'<[^>]+>', '', text)
            return text
        
        def normalize_text(self, text):
            """
            文本标准化
            """
            # 统一标点符号
            text = text.replace('?', '?').replace('!', '!').replace(',', ',')
            # 去除停用词(可选)
            words = [w for w in text.split() if w not in self.stop_words]
            return ' '.join(words)
        
        def prepare_faq_data(self, raw_data):
            """
            准备FAQ数据
            """
            processed_data = []
            
            for item in raw_data:
                processed_item = {
                    'id': item.get('id', f"faq_{len(processed_data)+1}"),
                    'question': self.clean_text(item['question']),
                    'answer': self.clean_text(item['answer']),
                    'category': item.get('category', 'general'),
                    'tags': item.get('tags', []),
                    'created_at': item.get('created_at', datetime.now().isoformat()),
                    'updated_at': item.get('updated_at', datetime.now().isoformat()),
                    'status': item.get('status', 'active'),
                    'similarity_questions': [
                        self.clean_text(q) for q in item.get('similarity_questions', [])
                    ]
                }
                
                # 构建完整搜索文本
                search_text = f"{processed_item['question']} {' '.join(processed_item['similarity_questions'])}"
                processed_item['search_text'] = search_text
                
                processed_data.append(processed_item)
            
            return processed_data
    
    # 示例数据准备
    raw_faq_data = [
        {
            "question": "密码忘记了怎么办?",
            "answer": "请访问登录页面,点击'忘记密码',输入注册邮箱,系统会发送重置链接到您的邮箱。或者联系客服人工处理。",
            "category": "账户",
            "tags": ["密码", "重置", "账户"]
        },
        {
            "question": "如何修改个人信息?",
            "answer": "登录后点击右上角头像 → '个人中心' → '编辑资料',即可修改昵称、头像、简介等信息。",
            "category": "账户",
            "tags": ["个人信息", "修改", "资料"]
        },
        # ... 更多FAQ条目
    ]
    
    preprocessor = FAQPreprocessor()
    processed_faq_data = preprocessor.prepare_faq_data(raw_faq_data)
    """
    
    print("\n数据预处理代码:")
    print(data_preprocessing_code)

faq_data_structure()

数据质量检查

def data_quality_check():
    """
    数据质量检查
    """
    print("数据质量检查:")
    
    quality_checks = [
        "完整性检查:确保必填字段不为空",
        "一致性检查:统一格式和标准",
        "准确性检查:验证答案正确性",
        "重复性检查:识别重复问答对",
        "相关性检查:确保问题答案匹配",
        "长度检查:过滤过长或过短内容"
    ]
    
    print("质量检查项目:")
    for check in quality_checks:
        print(f"  ✓ {check}")
    
    # 数据质量检查代码
    quality_check_code = """
    def validate_faq_data(faq_data):
        """
        验证FAQ数据质量
        """
        issues = []
        
        for i, item in enumerate(faq_data):
            # 检查必填字段
            if not item.get('question') or not item.get('answer'):
                issues.append(f"第{i+1}条:缺少问题或答案")
            
            # 检查长度
            if len(item.get('question', '')) < 5:
                issues.append(f"第{i+1}条:问题过短")
            
            if len(item.get('answer', '')) < 10:
                issues.append(f"第{i+1}条:答案过短")
            
            # 检查重复
            for j, other_item in enumerate(faq_data[i+1:], i+1):
                if item['question'] == other_item['question']:
                    issues.append(f"第{i+1}条和第{j+1}条:问题重复")
        
        return issues
    
    # 使用示例
    validation_issues = validate_faq_data(processed_faq_data)
    if validation_issues:
        print("发现以下问题:")
        for issue in validation_issues:
            print(f"  - {issue}")
    else:
        print("数据质量检查通过")
    """
    
    print("\n数据质量检查代码:")
    print(quality_check_code)

data_quality_check()

向量化与索引构建

嵌入模型选择

def embedding_model_selection():
    """
    嵌入模型选择与配置
    """
    print("嵌入模型选择:")
    
    models_comparison = [
        {
            "模型": "all-MiniLM-L6-v2",
            "维度": 384,
            "大小": "~80MB",
            "速度": "快",
            "中文支持": "一般",
            "适用场景": "轻量级应用"
        },
        {
            "模型": "paraphrase-multilingual-MiniLM-L12-v2",
            "维度": 768,
            "大小": "~200MB", 
            "速度": "中等",
            "中文支持": "好",
            "适用场景": "多语言应用"
        },
        {
            "模型": "bge-large-zh-v1.5",
            "维度": 1024,
            "大小": "~1.3GB",
            "速度": "较慢",
            "中文支持": "优秀",
            "适用场景": "中文专用"
        },
        {
            "模型": "text-embedding-ada-002",
            "维度": 1536,
            "大小": "云端",
            "速度": "快",
            "中文支持": "好",
            "适用场景": "商业应用"
        }
    ]
    
    print(f"{'模型':<35} {'维度':<8} {'大小':<10} {'速度':<8} {'中文支持':<10} {'适用场景':<15}")
    print("-" * 95)
    for model in models_comparison:
        print(f"{model['模型']:<35} {model['维度']:<8} {model['大小']:<10} {model['速度']:<8} {model['中文支持']:<10} {model['适用场景']:<15}")
    
    # 嵌入模型实现代码
    embedding_implementation = """
    from sentence_transformers import SentenceTransformer
    import numpy as np
    import torch
    
    class FAQEmbedder:
        def __init__(self, model_name='paraphrase-multilingual-MiniLM-L12-v2'):
            self.model_name = model_name
            self.model = SentenceTransformer(model_name)
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
            self.model.to(self.device)
        
        def encode_texts(self, texts, batch_size=32):
            """
            批量编码文本
            """
            embeddings = []
            
            for i in range(0, len(texts), batch_size):
                batch = texts[i:i+batch_size]
                batch_embeddings = self.model.encode(
                    batch,
                    convert_to_tensor=True,
                    show_progress_bar=True
                )
                embeddings.append(batch_embeddings.cpu().numpy())
            
            return np.vstack(embeddings)
        
        def encode_single_text(self, text):
            """
            编码单个文本
            """
            embedding = self.model.encode([text], convert_to_tensor=True)
            return embedding.cpu().numpy().flatten()
    
    # 使用示例
    embedder = FAQEmbedder('paraphrase-multilingual-MiniLM-L12-v2')
    
    # 编码FAQ问题
    questions = [item['question'] for item in processed_faq_data]
    question_embeddings = embedder.encode_texts(questions)
    
    print(f"编码了{len(questions)}个问题,向量维度:{question_embeddings.shape[1]}")
    """
    
    print("\n嵌入模型实现代码:")
    print(embedding_implementation)

embedding_model_selection()

向量索引构建

def vector_index_building():
    """
    向量索引构建
    """
    print("向量索引构建:")
    
    indexing_methods = [
        {
            "方法": "FAISS IndexFlatIP",
            "特点": "精确搜索,适合小数据集",
            "性能": "查询快,内存占用大",
            "适用": "小规模FAQ系统"
        },
        {
            "方法": "FAISS IndexIVFFlat", 
            "特点": "近似搜索,适合中等数据集",
            "性能": "平衡速度和精度",
            "适用": "中等规模系统"
        },
        {
            "方法": "FAISS IndexIVFPQ",
            "特点": "量化索引,节省内存",
            "性能": "大幅节省内存",
            "适用": "大规模系统"
        },
        {
            "方法": "Milvus",
            "特点": "企业级向量数据库",
            "性能": "高并发,分布式",
            "适用": "生产环境"
        }
    ]
    
    for method in indexing_methods:
        print(f"\n{method['方法']}:")
        print(f"  特点: {method['特点']}")
        print(f"  性能: {method['性能']}")
        print(f"  适用: {method['适用']}")
    
    # 索引构建代码
    index_building_code = """
    import faiss
    import numpy as np
    import pickle
    import os
    
    class VectorIndexBuilder:
        def __init__(self, dimension=768):
            self.dimension = dimension
        
        def build_faiss_index(self, embeddings, index_type='flat'):
            """
            构建FAISS索引
            """
            embeddings = embeddings.astype('float32')
            
            if index_type == 'flat':
                # 精确搜索索引
                index = faiss.IndexFlatIP(self.dimension)
            elif index_type == 'ivf':
                # IVF索引(需要训练)
                nlist = min(100, len(embeddings) // 10)  # 聚类中心数
                quantizer = faiss.IndexFlatIP(self.dimension)
                index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
                
                # 训练索引
                if len(embeddings) > nlist:
                    index.train(embeddings)
            elif index_type == 'pq':
                # 乘积量化索引
                nlist = min(100, len(embeddings) // 10)
                m = self.dimension // 8  # 子空间数
                index = faiss.IndexIVFPQ(
                    faiss.IndexFlatIP(self.dimension),
                    self.dimension,
                    nlist,
                    m,
                    8  # 每个子向量的比特数
                )
                if len(embeddings) > nlist:
                    index.train(embeddings)
            
            # 添加向量
            index.add(embeddings)
            return index
        
        def normalize_embeddings(self, embeddings):
            """
            L2归一化(用于余弦相似度)
            """
            norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
            return embeddings / (norms + 1e-8)
        
        def save_index(self, index, filepath):
            """
            保存索引
            """
            faiss.write_index(index, filepath)
        
        def load_index(self, filepath):
            """
            加载索引
            """
            return faiss.read_index(filepath)
    
    # 构建索引
    indexer = VectorIndexBuilder(dimension=question_embeddings.shape[1])
    
    # 归一化向量(用于余弦相似度)
    normalized_embeddings = indexer.normalize_embeddings(question_embeddings)
    
    # 构建索引
    faq_index = indexer.build_faiss_index(normalized_embeddings, index_type='flat')
    
    # 保存索引和FAQ数据
    indexer.save_index(faq_index, 'faq_vectors.index')
    with open('faq_data.pkl', 'wb') as f:
        pickle.dump(processed_faq_data, f)
    
    print(f"向量索引构建完成,包含{faq_index.ntotal}个向量")
    """
    
    print("\n索引构建代码:")
    print(index_building_code)

vector_index_building()

语义检索实现

检索算法实现

def semantic_retrieval_implementation():
    """
    语义检索算法实现
    """
    print("语义检索算法实现:")
    
    retrieval_algorithms = [
        "余弦相似度:最常用的语义相似度计算",
        "欧氏距离:适用于归一化向量",
        "内积相似度:FAISS默认的相似度计算",
        "Jaccard相似度:适用于稀疏向量"
    ]
    
    print("检索算法:")
    for algo in retrieval_algorithms:
        print(f"  ✓ {algo}")
    
    # 语义检索实现代码
    retrieval_implementation = """
    import faiss
    import numpy as np
    import pickle
    from typing import List, Dict, Tuple
    import time
    
    class SemanticRetriever:
        def __init__(self, index_path: str, data_path: str, embedder: FAQEmbedder):
            self.index = faiss.read_index(index_path)
            with open(data_path, 'rb') as f:
                self.faq_data = pickle.load(f)
            self.embedder = embedder
        
        def search(self, query: str, top_k: int = 5, threshold: float = 0.3) -> List[Dict]:
            """
            语义检索
            """
            start_time = time.time()
            
            # 编码查询
            query_embedding = self.embedder.encode_single_text(query)
            query_embedding = query_embedding.astype('float32')
            query_embedding = query_embedding / (np.linalg.norm(query_embedding) + 1e-8)  # 归一化
            
            # 检索
            scores, indices = self.index.search(query_embedding.reshape(1, -1), top_k)
            
            results = []
            for score, idx in zip(scores[0], indices[0]):
                if idx >= 0 and score >= threshold:  # 有效结果且满足阈值
                    faq_item = self.faq_data[idx]
                    results.append({
                        'id': faq_item['id'],
                        'question': faq_item['question'],
                        'answer': faq_item['answer'],
                        'category': faq_item['category'],
                        'score': float(score),
                        'index': int(idx)
                    })
            
            search_time = time.time() - start_time
            
            return {
                'results': results,
                'search_time': search_time,
                'total_found': len(results)
            }
        
        def batch_search(self, queries: List[str], top_k: int = 5) -> List[List[Dict]]:
            """
            批量检索
            """
            query_embeddings = self.embedder.encode_texts(queries)
            query_embeddings = query_embeddings.astype('float32')
            
            # 归一化
            norms = np.linalg.norm(query_embeddings, axis=1, keepdims=True)
            query_embeddings = query_embeddings / (norms + 1e-8)
            
            scores, indices = self.index.search(query_embeddings, top_k)
            
            all_results = []
            for query_idx in range(len(queries)):
                results = []
                for score, idx in zip(scores[query_idx], indices[query_idx]):
                    if idx >= 0:
                        faq_item = self.faq_data[idx]
                        results.append({
                            'question': faq_item['question'],
                            'answer': faq_item['answer'],
                            'score': float(score)
                        })
                all_results.append(results)
            
            return all_results
    
    # 使用示例
    retriever = SemanticRetriever('faq_vectors.index', 'faq_data.pkl', embedder)
    
    # 单次检索
    search_result = retriever.search("密码忘记了怎么办?", top_k=3)
    print("检索结果:")
    for result in search_result['results']:
        print(f"  问题: {result['question']}")
        print(f"  相似度: {result['score']:.4f}")
        print(f"  答案: {result['answer'][:50]}...")
        print()
    """
    
    print("语义检索实现代码:")
    print(retrieval_implementation)

semantic_retrieval_implementation()

检索优化策略

def retrieval_optimization():
    """
    检索优化策略
    """
    print("检索优化策略:")
    
    optimization_strategies = [
        {
            "策略": "多字段检索",
            "描述": "同时检索问题、答案、标签等多个字段",
            "效果": "提高检索覆盖率"
        },
        {
            "策略": "查询扩展",
            "描述": "使用同义词、相关词扩展查询",
            "效果": "提高召回率"
        },
        {
            "策略": "重排序",
            "描述": "对初步检索结果进行重排序",
            "效果": "提高精确率"
        },
        {
            "策略": "缓存机制",
            "描述": "缓存热门查询结果",
            "效果": "提升响应速度"
        },
        {
            "策略": "分层检索",
            "描述": "先粗筛再精排",
            "效果": "平衡效率和准确性"
        }
    ]
    
    for strategy in optimization_strategies:
        print(f"\n{strategy['策略']}:")
        print(f"  描述: {strategy['描述']}")
        print(f"  效果: {strategy['效果']}")
    
    # 优化检索代码
    optimized_retrieval_code = """
    from collections import defaultdict
    import heapq
    
    class OptimizedRetriever(SemanticRetriever):
        def __init__(self, index_path: str, data_path: str, embedder: FAQEmbedder):
            super().__init__(index_path, data_path, embedder)
            self.query_cache = {}
            self.cache_size = 1000
        
        def hybrid_search(self, query: str, top_k: int = 5) -> List[Dict]:
            """
            混合检索:语义+关键词
            """
            # 语义检索
            semantic_results = self.search(query, top_k * 2)['results']
            
            # 关键词匹配(简单实现)
            keyword_results = self.keyword_match(query, top_k * 2)
            
            # 融合结果
            combined_scores = defaultdict(float)
            
            # 语义分数
            for i, result in enumerate(semantic_results):
                combined_scores[result['id']] += result['score'] * 0.7  # 语义权重
            
            # 关键词分数
            for i, result in enumerate(keyword_results):
                combined_scores[result['id']] += result['keyword_score'] * 0.3  # 关键词权重
            
            # 排序并返回top_k
            sorted_results = sorted(
                combined_scores.items(), 
                key=lambda x: x[1], 
                reverse=True
            )[:top_k]
            
            final_results = []
            for faq_id, score in sorted_results:
                faq_item = next(item for item in self.faq_data if item['id'] == faq_id)
                final_results.append({
                    'question': faq_item['question'],
                    'answer': faq_item['answer'],
                    'category': faq_item['category'],
                    'score': score,
                    'id': faq_id
                })
            
            return final_results
        
        def keyword_match(self, query: str, top_k: int) -> List[Dict]:
            """
            简单关键词匹配
            """
            import re
            from difflib import SequenceMatcher
            
            query_words = set(re.findall(r'\\w+', query.lower()))
            results = []
            
            for faq_item in self.faq_data:
                text = f"{faq_item['question']} {faq_item['answer']}".lower()
                text_words = set(re.findall(r'\\w+', text))
                
                # 计算关键词匹配度
                common_words = query_words.intersection(text_words)
                keyword_score = len(common_words) / max(len(query_words), 1)
                
                # 计算文本相似度
                similarity = SequenceMatcher(None, query.lower(), text).ratio()
                
                total_score = keyword_score * 0.6 + similarity * 0.4
                
                results.append({
                    'id': faq_item['id'],
                    'question': faq_item['question'],
                    'answer': faq_item['answer'],
                    'keyword_score': total_score
                })
            
            # 按分数排序
            results.sort(key=lambda x: x['keyword_score'], reverse=True)
            return results[:top_k]
    
    # 使用优化检索
    optimized_retriever = OptimizedRetriever('faq_vectors.index', 'faq_data.pkl', embedder)
    hybrid_results = optimized_retriever.hybrid_search("密码忘记了怎么办?", top_k=3)
    """
    
    print("\n优化检索代码:")
    print(optimized_retrieval_code)

retrieval_optimization()

RAG问答系统

RAG架构实现

def rag_architecture_implementation():
    """
    RAG架构实现
    """
    print("RAG(检索增强生成)架构实现:")
    
    rag_process = """
    RAG系统工作流程:
    
    1. 查询理解
       - 解析用户问题意图
       - 识别关键实体和概念
       - 可能的查询改写
    
    2. 信息检索
       - 使用查询检索相关文档
       - 获取top-k最相关的结果
       - 对结果进行相关性评分
    
    3. 上下文构建
       - 将检索结果整合为上下文
       - 过滤不相关信息
       - 格式化为模型输入
    
    4. 答案生成
       - 使用LLM基于上下文生成答案
       - 保持事实准确性
       - 生成可解释的回答
    
    5. 后处理
       - 答案验证和优化
       - 引用来源标注
       - 质量评估
    """
    
    print(rag_process)
    
    # RAG系统实现代码
    rag_implementation = """
    from openai import OpenAI
    import re
    from typing import List, Dict, Optional
    
    class RAGSystem:
        def __init__(self, retriever: SemanticRetriever, llm_client=None):
            self.retriever = retriever
            self.llm_client = llm_client  # 可选的LLM客户端
            self.max_context_length = 3000  # 最大上下文长度
        
        def build_context(self, query: str, retrieved_docs: List[Dict]) -> str:
            """
            构建上下文
            """
            context_parts = []
            
            for i, doc in enumerate(retrieved_docs, 1):
                doc_context = f"""
                文档 {i}:
                问题: {doc['question']}
                答案: {doc['answer']}
                相似度: {doc['score']:.4f}
                """.strip()
                
                context_parts.append(doc_context)
            
            full_context = "\\n\\n".join(context_parts)
            
            # 截断过长的上下文
            if len(full_context) > self.max_context_length:
                full_context = full_context[:self.max_context_length]
                # 确保不截断文档边界
                last_doc_end = full_context.rfind('\\n\\n')
                if last_doc_end > 0:
                    full_context = full_context[:last_doc_end]
            
            return full_context
        
        def generate_answer(self, query: str, context: str) -> Dict:
            """
            生成答案
            """
            if self.llm_client:
                # 使用外部LLM
                prompt = f'''
                你是一个智能客服助手。请根据以下知识库信息回答用户问题。

                知识库信息:
                {context}

                用户问题:{query}

                请遵循以下要求:
                1. 如果知识库中有相关信息,请基于这些信息给出准确回答
                2. 如果没有相关信息,请礼貌说明无法找到答案
                3. 回答应简洁明了,控制在200字以内
                4. 如有必要,可说明信息来源
                5. 保持客观中立的语气
                '''
                
                try:
                    response = self.llm_client.chat.completions.create(
                        model="gpt-4o-mini",
                        messages=[{"role": "user", "content": prompt}],
                        max_tokens=500,
                        temperature=0.3
                    )
                    
                    answer = response.choices[0].message.content
                    
                    return {
                        'answer': answer,
                        'status': 'success',
                        'generated_by': 'llm'
                    }
                except Exception as e:
                    return {
                        'answer': '抱歉,答案生成遇到问题,请稍后重试。',
                        'status': 'error',
                        'error': str(e),
                        'generated_by': 'fallback'
                    }
            else:
                # 使用规则-based方法作为备选
                return self.rule_based_answer(query, context)
        
        def rule_based_answer(self, query: str, context: str) -> Dict:
            """
            基于规则的答案生成(备选方案)
            """
            if not context.strip():
                return {
                    'answer': '抱歉,没有找到相关信息。您可以尝试重新表述问题或联系客服。',
                    'status': 'not_found',
                    'generated_by': 'rule_based'
                }
            
            # 简单的规则:返回最相关的文档答案
            lines = context.split('\\n\\n')
            if lines:
                # 提取第一个文档的答案
                for line in lines:
                    if '答案:' in line:
                        answer = line.split('答案:')[1].strip()
                        return {
                            'answer': f'根据知识库信息:{answer}',
                            'status': 'success',
                            'generated_by': 'rule_based'
                        }
            
            return {
                'answer': '找到了相关信息,但无法生成确切答案。请提供更多细节或联系客服。',
                'status': 'partial',
                'generated_by': 'rule_based'
            }
        
        def query(self, user_query: str, top_k: int = 3, threshold: float = 0.3) -> Dict:
            """
            完整的RAG查询流程
            """
            import time
            start_time = time.time()
            
            # 1. 语义检索
            retrieval_result = self.retriever.search(user_query, top_k, threshold)
            
            # 2. 构建上下文
            context = self.build_context(user_query, retrieval_result['results'])
            
            # 3. 生成答案
            generation_result = self.generate_answer(user_query, context)
            
            total_time = time.time() - start_time
            
            return {
                'query': user_query,
                'answer': generation_result['answer'],
                'status': generation_result['status'],
                'retrieved_docs': retrieval_result['results'],
                'context_used': context,
                'response_time': total_time,
                'retrieval_time': retrieval_result['search_time'],
                'generated_by': generation_result.get('generated_by', 'unknown')
            }
    
    # 使用RAG系统
    rag_system = RAGSystem(optimized_retriever)
    
    # 示例查询
    result = rag_system.query("密码忘记了怎么办?", top_k=3)
    print(f"答案: {result['answer']}")
    print(f"检索到 {len(result['retrieved_docs'])} 个相关文档")
    print(f"响应时间: {result['response_time']:.4f}秒")
    """
    
    print("RAG系统实现代码:")
    print(rag_implementation)

rag_architecture_implementation()

RAG优化策略

def rag_optimization_strategies():
    """
    RAG系统优化策略
    """
    print("RAG系统优化策略:")
    
    optimization_strategies = [
        {
            "策略": "查询改写",
            "描述": "优化用户查询以提高检索效果",
            "技术": "使用LLM重写查询"
        },
        {
            "策略": "上下文压缩",
            "描述": "压缩长上下文以适应模型限制",
            "技术": "抽取关键句子或摘要"
        },
        {
            "策略": "多跳检索",
            "描述": "进行多轮检索获取更多信息",
            "技术": "基于第一轮结果生成新查询"
        },
        {
            "策略": "自我一致性",
            "描述": "生成多个答案并选择最一致的",
            "技术": "多次生成取多数票"
        },
        {
            "策略": "置信度评估",
            "描述": "评估答案的可信度",
            "技术": "基于检索分数和生成质量"
        }
    ]
    
    for strategy in optimization_strategies:
        print(f"\n{strategy['策略']}:")
        print(f"  描述: {strategy['描述']}")
        print(f"  技术: {strategy['技术']}")
    
    # RAG优化代码
    rag_optimization_code = """
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    class AdvancedRAGSystem(RAGSystem):
        def __init__(self, retriever: SemanticRetriever, llm_client=None):
            super().__init__(retriever, llm_client)
            self.executor = ThreadPoolExecutor(max_workers=4)
        
        async def query_rewrite(self, original_query: str) -> str:
            """
            查询改写
            """
            if self.llm_client:
                rewrite_prompt = f"""
                请将以下用户查询改写为更清晰、更具体的搜索查询,保持原意不变:
                
                原查询:{original_query}
                
                改写后的查询:
                """
                
                try:
                    response = self.llm_client.chat.completions.create(
                        model="gpt-4o-mini",
                        messages=[{"role": "user", "content": rewrite_prompt}],
                        max_tokens=100
                    )
                    return response.choices[0].message.content.strip()
                except:
                    return original_query
            return original_query
        
        def compress_context(self, context: str, max_length: int = 2000) -> str:
            """
            上下文压缩
            """
            if len(context) <= max_length:
                return context
            
            # 简单的压缩:保留最重要的部分
            sentences = context.split('. ')
            compressed_parts = []
            current_length = 0
            
            for sentence in sentences:
                if current_length + len(sentence) <= max_length:
                    compressed_parts.append(sentence)
                    current_length += len(sentence)
                else:
                    break
            
            return '. '.join(compressed_parts) + '...'
        
        def multi_hop_retrieve(self, query: str, hops: int = 2) -> List[Dict]:
            """
            多跳检索
            """
            all_results = []
            current_query = query
            
            for hop in range(hops):
                hop_results = self.retriever.search(current_query, top_k=2)['results']
                all_results.extend(hop_results)
                
                if hop < hops - 1 and hop_results:
                    # 基于结果生成新查询
                    context = self.build_context(current_query, hop_results[:1])
                    follow_up_prompt = f"""
                    基于以下信息,生成一个更具体的后续查询来获取更多信息:
                    
                    上下文:{context}
                    
                    原查询:{current_query}
                    
                    后续查询:
                    """
                    
                    if self.llm_client:
                        try:
                            response = self.llm_client.chat.completions.create(
                                model="gpt-4o-mini",
                                messages=[{"role": "user", "content": follow_up_prompt}],
                                max_tokens=100
                            )
                            current_query = response.choices[0].message.content.strip()
                        except:
                            break
            
            # 去重并按相关性排序
            seen_ids = set()
            unique_results = []
            for result in all_results:
                if result['id'] not in seen_ids:
                    unique_results.append(result)
                    seen_ids.add(result['id'])
            
            return unique_results[:5]  # 返回top-5
        
        async def query_with_optimizations(self, user_query: str) -> Dict:
            """
            带优化的查询
            """
            start_time = time.time()
            
            # 1. 查询改写
            rewritten_query = await self.query_rewrite(user_query)
            
            # 2. 多跳检索
            retrieved_docs = self.multi_hop_retrieve(rewritten_query)
            
            # 3. 构建并压缩上下文
            context = self.build_context(rewritten_query, retrieved_docs)
            compressed_context = self.compress_context(context)
            
            # 4. 生成答案
            generation_result = self.generate_answer(rewritten_query, compressed_context)
            
            total_time = time.time() - start_time
            
            return {
                'original_query': user_query,
                'rewritten_query': rewritten_query,
                'answer': generation_result['answer'],
                'status': generation_result['status'],
                'retrieved_docs': retrieved_docs,
                'context_used': compressed_context,
                'response_time': total_time,
                'generated_by': generation_result.get('generated_by', 'unknown')
            }
    
    # 使用优化的RAG系统
    advanced_rag = AdvancedRAGSystem(optimized_retriever)
    
    # 注意:在实际使用中需要正确初始化asyncio事件循环
    """
    
    print("\nRAG优化代码:")
    print(rag_optimization_code)

rag_optimization_strategies()

API服务部署

FastAPI服务实现

def fastapi_deployment():
    """
    FastAPI服务部署实现
    """
    print("FastAPI服务部署:")
    
    fastapi_implementation = """
    from fastapi import FastAPI, HTTPException, BackgroundTasks
    from pydantic import BaseModel
    from typing import List, Optional, Dict, Any
    import time
    import logging
    from uuid import uuid4
    
    app = FastAPI(
        title="语义搜索与问答系统API",
        description="基于RAG架构的智能问答系统",
        version="1.0.0"
    )
    
    # 请求模型
    class QueryRequest(BaseModel):
        query: str
        top_k: Optional[int] = 3
        threshold: Optional[float] = 0.3
        use_hybrid: Optional[bool] = False
        include_sources: Optional[bool] = True
    
    class FeedbackRequest(BaseModel):
        query_id: str
        query: str
        answer: str
        rating: int  # 1-5
        helpful: bool
        comment: Optional[str] = None
    
    # 响应模型
    class QueryResponse(BaseModel):
        query_id: str
        query: str
        answer: str
        sources: Optional[List[Dict]] = None
        response_time: float
        retrieval_time: Optional[float] = None
        status: str
        debug_info: Optional[Dict] = None
    
    # 全局RAG系统实例(在实际部署中应使用依赖注入)
    rag_system = None  # 初始化时赋值
    
    @app.on_event('startup')
    async def startup_event():
        global rag_system
        # 初始化RAG系统
        rag_system = AdvancedRAGSystem(optimized_retriever)
        logging.info("RAG系统初始化完成")
    
    @app.post("/query", response_model=QueryResponse)
    async def query_endpoint(request: QueryRequest):
        """
        问答查询接口
        """
        start_time = time.time()
        query_id = str(uuid4())
        
        try:
            logging.info(f"查询 {query_id}: {request.query}")
            
            if request.use_hybrid:
                # 使用混合检索
                results = optimized_retriever.hybrid_search(
                    request.query, 
                    top_k=request.top_k
                )
                
                # 构建上下文并生成答案
                context = rag_system.build_context(request.query, results)
                generation_result = rag_system.generate_answer(request.query, context)
                
                response_data = {
                    'query_id': query_id,
                    'query': request.query,
                    'answer': generation_result['answer'],
                    'sources': results if request.include_sources else None,
                    'response_time': time.time() - start_time,
                    'status': generation_result['status'],
                    'debug_info': {
                        'method': 'hybrid',
                        'generated_by': generation_result.get('generated_by', 'unknown')
                    }
                }
            else:
                # 使用标准RAG流程
                result = rag_system.query(
                    request.query,
                    top_k=request.top_k,
                    threshold=request.threshold
                )
                
                response_data = {
                    'query_id': query_id,
                    'query': request.query,
                    'answer': result['answer'],
                    'sources': result['retrieved_docs'] if request.include_sources else None,
                    'response_time': result['response_time'],
                    'retrieval_time': result.get('retrieval_time'),
                    'status': result['status'],
                    'debug_info': {
                        'method': 'standard_rag',
                        'generated_by': result['generated_by']
                    }
                }
            
            logging.info(f"查询 {query_id} 完成,耗时: {response_data['response_time']:.4f}s")
            return response_data
        
        except Exception as e:
            logging.error(f"查询 {query_id} 失败: {str(e)}")
            raise HTTPException(status_code=500, detail=f"查询处理失败: {str(e)}")
    
    @app.post("/search")
    async def search_endpoint(request: QueryRequest):
        """
        仅检索接口
        """
        start_time = time.time()
        
        try:
            if request.use_hybrid:
                results = optimized_retriever.hybrid_search(
                    request.query, 
                    top_k=request.top_k
                )
            else:
                search_result = optimized_retriever.search(
                    request.query, 
                    request.top_k, 
                    request.threshold
                )
                results = search_result['results']
            
            return {
                'query': request.query,
                'results': results,
                'count': len(results),
                'search_time': time.time() - start_time
            }
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"检索失败: {str(e)}")
    
    @app.post("/feedback")
    async def feedback_endpoint(request: FeedbackRequest):
        """
        反馈接口
        """
        # 这里可以保存反馈到数据库,用于后续模型优化
        feedback_data = {
            'query_id': request.query_id,
            'rating': request.rating,
            'helpful': request.helpful,
            'comment': request.comment,
            'timestamp': time.time()
        }
        
        logging.info(f"收到反馈: {feedback_data}")
        
        # 可以异步处理反馈数据
        # await process_feedback_async(feedback_data)
        
        return {"status": "received", "message": "反馈已接收"}
    
    @app.get("/health")
    async def health_check():
        """
        健康检查接口
        """
        return {
            "status": "healthy",
            "timestamp": time.time(),
            "model_loaded": rag_system is not None
        }
    
    @app.get("/stats")
    async def get_stats():
        """
        系统统计信息
        """
        # 这里可以返回实际的系统统计信息
        return {
            "total_queries": 0,  # 从数据库或缓存获取
            "avg_response_time": 0.0,
            "success_rate": 100.0,
            "uptime": time.time()  # 实际运行时间
        }
    
    # 启动命令: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
    """
    
    print("FastAPI服务实现代码:")
    print(fastapi_implementation)

fastapi_deployment()

Docker部署配置

def docker_deployment_config():
    """
    Docker部署配置
    """
    print("Docker部署配置:")
    
    dockerfile_content = """
    # Dockerfile
    FROM python:3.9-slim
    
    WORKDIR /app
    
    # 安装系统依赖
    RUN apt-get update \\
        && apt-get install -y gcc g++ build-essential \\
        && rm -rf /var/lib/apt/lists/*
    
    # 复制依赖文件
    COPY requirements.txt .
    
    # 安装Python依赖
    RUN pip install --no-cache-dir -r requirements.txt
    
    # 复制应用代码
    COPY . .
    
    # 创建模型缓存目录
    RUN mkdir -p /root/.cache/huggingface
    
    # 暴露端口
    EXPOSE 8000
    
    # 启动命令
    CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
    """
    
    requirements_content = """
    # requirements.txt
    fastapi==0.104.1
    uvicorn[standard]==0.24.0
    sentence-transformers==2.2.2
    faiss-cpu==1.7.4
    torch==2.1.0
    transformers==4.35.0
    pandas==2.1.0
    numpy==1.24.3
    openai==1.3.0
    pydantic==2.4.2
    python-multipart==0.0.6
    redis==5.0.1
    """
    
    docker_compose_content = """
    # docker-compose.yml
    version: '3.8'
    
    services:
      rag-service:
        build: .
        ports:
          - "8000:8000"
        environment:
          - OPENAI_API_KEY=${OPENAI_API_KEY}
          - TRANSFORMERS_CACHE=/root/.cache/huggingface
          - HF_HOME=/root/.cache/huggingface
        volumes:
          - ./models:/app/models
          - ./data:/app/data
          - ~/.cache/huggingface:/root/.cache/huggingface
        deploy:
          resources:
            limits:
              memory: 8G
              cpus: '4'
        restart: unless-stopped
        depends_on:
          - redis
        healthcheck:
          test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
          interval: 30s
          timeout: 10s
          retries: 3
          start_period: 60s
    
      redis:
        image: redis:7-alpine
        ports:
          - "6379:6379"
        volumes:
          - redis_data:/data
        command: redis-server --appendonly yes
        restart: unless-stopped
    
    volumes:
      redis_data:
    """
    
    print("Dockerfile:")
    print(dockerfile_content)
    print("\nrequirements.txt:")
    print(requirements_content)
    print("\ndocker-compose.yml:")
    print(docker_compose_content)

docker_deployment_config()

前端界面开发

Streamlit前端实现

def streamlit_frontend():
    """
    Streamlit前端界面实现
    """
    print("Streamlit前端界面:")
    
    streamlit_app_code = """
    import streamlit as st
    import requests
    import json
    import time
    from datetime import datetime
    
    # 页面配置
    st.set_page_config(
        page_title="FAQ智能问答系统",
        page_icon="🤖",
        layout="wide"
    )
    
    # 初始化session state
    if 'conversation_history' not in st.session_state:
        st.session_state.conversation_history = []
    
    # 标题
    st.title("🤖 FAQ 智能问答系统")
    st.markdown("---")
    
    # 侧边栏配置
    with st.sidebar:
        st.header("⚙️ 系统设置")
        
        api_base_url = st.text_input(
            "API地址", 
            value="http://localhost:8000",
            help="后端API服务地址"
        )
        
        top_k = st.slider("返回结果数", 1, 10, 3)
        threshold = st.slider("相似度阈值", 0.0, 1.0, 0.3, step=0.05)
        
        use_hybrid = st.checkbox("使用混合检索", value=False)
        show_sources = st.checkbox("显示来源", value=True)
        
        st.markdown("---")
        st.info("💡 提示:输入您的问题,系统将为您智能解答")
    
    # 主界面
    col1, col2 = st.columns([2, 1])
    
    with col1:
        # 输入区域
        user_input = st.text_area(
            "请输入您的问题:",
            placeholder="例如:密码忘记了怎么办?如何修改个人信息?",
            height=100
        )
        
        # 查询按钮
        if st.button("🔍 提问", type="primary", use_container_width=True):
            if user_input.strip():
                with st.spinner("🧠 思考中..."):
                    try:
                        # 调用API
                        api_url = f"{api_base_url}/query"
                        payload = {
                            "query": user_input,
                            "top_k": top_k,
                            "threshold": threshold,
                            "use_hybrid": use_hybrid,
                            "include_sources": show_sources
                        }
                        
                        start_time = time.time()
                        response = requests.post(api_url, json=payload)
                        end_time = time.time()
                        
                        if response.status_code == 200:
                            result = response.json()
                            
                            # 显示答案
                            st.success(result['answer'])
                            
                            # 显示性能信息
                            st.info(f"⏱️ 响应时间: {result['response_time']:.3f}秒")
                            
                            # 显示来源
                            if show_sources and result.get('sources'):
                                with st.expander("📋 查看参考来源"):
                                    for i, source in enumerate(result['sources'], 1):
                                        with st.container():
                                            st.markdown(f"**来源 {i}:**")
                                            st.markdown(f"**问题:** {source['question']}")
                                            st.markdown(f"**答案:** {source['answer'][:200]}...")
                                            st.markdown(f"*相似度: {source['score']:.3f}*")
                                            st.markdown("---")
                            
                            # 添加到对话历史
                            st.session_state.conversation_history.append({
                                'question': user_input,
                                'answer': result['answer'],
                                'timestamp': datetime.now().strftime("%H:%M:%S"),
                                'sources': result.get('sources', [])
                            })
                            
                        else:
                            st.error(f"❌ API调用失败: {response.text}")
                    
                    except requests.exceptions.ConnectionError:
                        st.error("❌ 无法连接到后端服务,请检查API地址是否正确")
                    except Exception as e:
                        st.error(f"❌ 发生错误: {str(e)}")
            else:
                st.warning("⚠️ 请输入问题")
    
    with col2:
        st.header("💬 对话历史")
        
        if st.session_state.conversation_history:
            # 显示最近的对话
            for i, conv in enumerate(reversed(st.session_state.conversation_history[-5:])):
                with st.expander(f"⏰ {conv['timestamp']}", expanded=False):
                    st.markdown(f"**您:** {conv['question']}")
                    st.markdown(f"**系统:** {conv['answer'][:100]}...")
        else:
            st.info("暂无对话历史")
        
        # 清空历史
        if st.button("🗑️ 清空历史"):
            st.session_state.conversation_history = []
            st.rerun()
    
    # 反馈功能
    st.markdown("---")
    st.subheader("⭐ 反馈与评价")
    
    if st.session_state.conversation_history:
        latest_conv = st.session_state.conversation_history[-1]
        
        col_rate1, col_rate2 = st.columns(2)
        
        with col_rate1:
            rating = st.select_slider(
                "满意度评分",
                options=[1, 2, 3, 4, 5],
                format_func=lambda x: ['1 ⭐', '2 ⭐', '3 ⭐', '4 ⭐', '5 ⭐'][x-1]
            )
        
        with col_rate2:
            helpful = st.radio("答案是否有帮助?", ("是", "否"), horizontal=True)
        
        feedback_comment = st.text_area("其他意见或建议:", height=60)
        
        if st.button("📤 提交反馈"):
            try:
                feedback_payload = {
                    "query_id": latest_conv.get('query_id', ''),
                    "query": latest_conv['question'],
                    "answer": latest_conv['answer'],
                    "rating": rating,
                    "helpful": helpful == "是",
                    "comment": feedback_comment
                }
                
                feedback_response = requests.post(
                    f"{api_base_url}/feedback", 
                    json=feedback_payload
                )
                
                if feedback_response.status_code == 200:
                    st.success("✅ 感谢您的反馈!")
                else:
                    st.error("❌ 反馈提交失败")
            except:
                st.error("❌ 反馈提交失败")
    
    # 页脚
    st.markdown("---")
    st.caption("Powered by RAG System | Built with Streamlit")
    """
    
    print("Streamlit前端实现代码:")
    print(streamlit_app_code)

streamlit_frontend()

性能优化

系统性能优化策略

def performance_optimization():
    """
    系统性能优化策略
    """
    print("系统性能优化:")
    
    optimization_areas = [
        {
            "优化领域": "向量化性能",
            "策略": "批量处理、GPU加速、模型量化",
            "目标": "提高向量化速度"
        },
        {
            "优化领域": "检索性能", 
            "策略": "索引优化、缓存机制、近似搜索",
            "目标": "毫秒级检索响应"
        },
        {
            "优化领域": "API性能",
            "策略": "异步处理、连接池、负载均衡",
            "目标": "高并发支持"
        },
        {
            "优化领域": "存储性能",
            "策略": "向量数据库优化、数据分区",
            "目标": "高效存储访问"
        }
    ]
    
    for area in optimization_areas:
        print(f"\n{area['优化领域']}:")
        print(f"  策略: {area['策略']}")
        print(f"  目标: {area['目标']}")
    
    # 性能优化代码
    performance_optimization_code = """
    import asyncio
    import aiohttp
    from concurrent.futures import ThreadPoolExecutor
    import redis
    import pickle
    from functools import lru_cache
    import time
    
    class PerformanceOptimizer:
        def __init__(self):
            # Redis缓存
            self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
            # 线程池
            self.executor = ThreadPoolExecutor(max_workers=10)
            # 本地缓存
            self.local_cache = {}
        
        @lru_cache(maxsize=1000)
        def cached_embed_query(self, query: str):
            """
            缓存查询向量化结果
            """
            return embedder.encode_single_text(query)
        
        def get_cached_result(self, query: str, top_k: int) -> Optional[Dict]:
            """
            从缓存获取结果
            """
            cache_key = f"search:{hash(query)}:{top_k}"
            cached_result = self.redis_client.get(cache_key)
            if cached_result:
                return pickle.loads(cached_result)
            return None
        
        def cache_result(self, query: str, top_k: int, result: Dict, ttl: int = 3600):
            """
            缓存搜索结果
            """
            cache_key = f"search:{hash(query)}:{top_k}"
            self.redis_client.setex(
                cache_key, 
                ttl, 
                pickle.dumps(result)
            )
        
        async def async_batch_search(self, queries: List[str], top_k: int = 3):
            """
            异步批量搜索
            """
            tasks = []
            for query in queries:
                task = asyncio.create_task(self.async_single_search(query, top_k))
                tasks.append(task)
            
            results = await asyncio.gather(*tasks)
            return results
        
        async def async_single_search(self, query: str, top_k: int):
            """
            异步单次搜索
            """
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                self.executor,
                lambda: optimized_retriever.search(query, top_k)
            )
    
    # 使用性能优化器
    optimizer = PerformanceOptimizer()
    """
    
    print("\n性能优化代码:")
    print(performance_optimization_code)

performance_optimization()

实际应用案例

企业应用案例

def enterprise_case_studies():
    """
    企业应用案例分析
    """
    print("企业应用案例:")
    
    case_studies = [
        {
            "行业": "电商平台",
            "应用场景": "客服FAQ系统",
            "规模": "10万+FAQ条目",
            "技术方案": "BGE-large-zh + Milvus + GPT-4",
            "效果": "客服效率提升60%,响应时间<200ms"
        },
        {
            "行业": "金融服务",
            "应用场景": "理财产品咨询",
            "规模": "5万+产品问答",
            "技术方案": "混合检索 + 金融领域微调模型",
            "效果": "准确率92%,用户满意度4.5/5"
        },
        {
            "行业": "教育机构",
            "应用场景": "课程咨询问答",
            "规模": "2万+课程FAQ",
            "技术方案": "Sentence-BERT + FAISS + 本地LLM",
            "效果": "7×24小时服务,成本降低40%"
        },
        {
            "行业": "医疗健康",
            "应用场景": "健康咨询助手",
            "规模": "8万+医学问答",
            "技术方案": "医疗领域专用模型 + 知识图谱",
            "效果": "专业性强,安全性