RAG back-end system integration: practice of linking FastAPI and vector database
📂 Stage: Stage 6 - 2026 Featured Topics (AI Integration)
🔗 Related chapters: 流式响应 StreamingResponse · 异步任务队列 Celery
1. What is RAG?
1.1 One sentence + process talk
RAG = "Semantic Indexer" of the knowledge base + "Real-time Knowledge Patch" of the large model
To put it simply: First, break down the company’s internal documents and latest data into small pieces, convert them into digital vectors, and store them. When users ask questions, first find the "most relevant document fragments" and then insert the fragments into the context of the large model so that the model can generate well-founded and non-fabricated answers.
flowchart LR
A[用户问题] --> B[问题向量化]
B --> C[向量相似度检索]
D[文档预处理] --> E[分块]
E --> F[文档向量化]
F --> G[存入向量库]
G --> C
C --> H[Top-K 高相关片段]
H + A --> I[构建增强Prompt]
I --> J[LLM 生成答案]
J --> K[返回答案+引用来源]
1.2 Why not use pure prompt or fine-tuning?
2. Vector database selection: lightweight Chroma vs production-grade Milvus
The vector database is specially used to store "numeric vectors" and perform ultra-fast similarity search.
Linear searches in ordinary databases are too slow (it takes several minutes for millions of vectors), but vector databases can use indexing algorithms such as HNSW to perform search operations in milliseconds.
2.1 Chroma (preferred for small and medium scale/prototypes)
No additional server required,pipReady to install, local persistence, minimalist Python API.
Quick installation
pip install chromadb sentence-transformers
Encapsulated into a global vector library (vector_store/chroma_store.py)
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
class ChromaStore:
def __init__(self, collection_name: str = "docs"):
# 关闭匿名遥测,数据存到本地 chroma_db 目录
self.client = chromadb.PersistentClient(
path="./chroma_db",
settings=Settings(anonymized_telemetry=False)
)
# 免费轻量英文嵌入模型;中文可选 shibing624/text2vec-base-chinese
self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# 使用余弦相似度创建/获取集合
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
def add_doc(self, doc_id: str, text: str, metadata: dict | None = None):
"""单条文档入库"""
embedding = self.embedding_model.encode(text).tolist()
self.collection.add(
ids=[doc_id],
documents=[text],
embeddings=[embedding],
metadatas=[metadata or {}]
)
def search(self, query: str, top_k: int = 5) -> list[dict]:
"""语义检索,返回统一格式的结果"""
query_emb = self.embedding_model.encode(query).tolist()
raw_res = self.collection.query(
query_embeddings=[query_emb],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
# 将距离(越小越相似)转换为 0-1 的直观相似度
return [
{
"id": raw_res["ids"][0][i],
"text": raw_res["documents"][0][i],
"metadata": raw_res["metadatas"][0][i],
"score": round(1 - raw_res["distances"][0][i], 4)
}
for i in range(len(raw_res["ids"][0]))
]
def delete_doc(self, doc_id: str):
self.collection.delete(ids=[doc_id])
# 全局实例,服务启动时加载
vector_db = ChromaStore()
2.2 Milvus (first choice for million-level/tens-of-million-level production)
Docker is required to start the server, but it supports distributed deployment and high-concurrency retrieval, making it the mainstream choice of major manufacturers.
Quick Start (Docker)
# 拉取 Milvus Standalone 镜像(适合单机测试)
wget https://github.com/milvus-io/milvus/releases/download/v2.4.10/milvus-standalone-docker-compose.yml -O docker-compose.yml
docker-compose up -d
Encapsulate core interface (vector_store/milvus_store.py)
from pymilvus import MilvusClient, DataType
class MilvusStore:
def __init__(self, uri: str = "http://localhost:19530"):
self.client = MilvusClient(uri=uri)
self.collection_name = "docs"
self.embedding_dim = 384 # all-MiniLM-L6-v2 输出维度
# 首次启动自动建表
if not self.client.has_collection(self.collection_name):
self._create_collection()
def _create_collection(self):
schema = MilvusClient.create_schema(
auto_id=True,
enable_dynamic_field=True # 允许自由扩展元数据字段
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=self.embedding_dim)
schema.add_field("text", DataType.VARCHAR, max_length=4096)
schema.add_field("source", DataType.VARCHAR, max_length=255)
self.client.create_collection(
collection_name=self.collection_name,
schema=schema,
index_params={
"vector": {
"type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 8, "efConstruction": 64}
}
}
)
# 加载到内存,加速检索
self.client.load_collection(self.collection_name)
def batch_insert(self, texts: list[str], sources: list[str], embeddings: list[list[float]]):
"""批量文档入库(生产环境必须用批量)"""
data = [
{"vector": emb, "text": text, "source": source}
for text, source, emb in zip(texts, sources, embeddings)
]
return self.client.insert(collection_name=self.collection_name, data=data)
3. Document processing: pipeline from plain text to vector
3.1 Blocking strategy (core optimization points)
The entire document cannot be vectorized directly - if it is too large, there will be too much contextual noise, and if it is too small, it will cut off the semantic association.
Here is a paragraph first + sliding window universal chunker:
# processing/chunker.py
from typing import List
import re
class TextChunker:
def __init__(self, chunk_size: int = 500, overlap: int = 50):
self.chunk_size = chunk_size # 每块最大字符数(英文≈80 token,中文≈250 token)
self.overlap = overlap # 相邻块重叠字符数,用于保留上下文
def chunk_text(self, text: str, source: str = "") -> List[dict]:
"""优先按段落切割,不足则合并,超出则拆分"""
# 先以两个及以上换行符切分段落
paragraphs = [p.strip() for p in re.split(r'\n{2,}', text) if p.strip()]
chunks = []
current_chunk = ""
chunk_idx = 0
for para in paragraphs:
# 若拼上当前段仍未超限,直接追加
if len(current_chunk) + len(para) + 2 < self.chunk_size:
current_chunk += para + "\n\n"
else:
# 先保存当前块
if current_chunk.strip():
chunks.append({
"id": f"{source.replace('/', '_')}_{chunk_idx}",
"text": current_chunk.strip(),
"metadata": {"source": source, "chunk_idx": chunk_idx}
})
chunk_idx += 1
# 滑动窗口:下一段的开头保留 overlap 字符
if len(para) > self.overlap:
current_chunk = para[-self.overlap:] + "\n\n" + para + "\n\n"
else:
current_chunk = para + "\n\n"
# 处理最后一个块
if current_chunk.strip():
chunks.append({
"id": f"{source.replace('/', '_')}_{chunk_idx}",
"text": current_chunk.strip(),
"metadata": {"source": source, "chunk_idx": chunk_idx}
})
return chunks
4. FastAPI + RAG: Write a deployable question and answer interface
4.1 Core RAG service encapsulation (services/rag_service.py)
from vector_store.chroma_store import vector_db
from sentence_transformers import SentenceTransformer
from openai import AsyncOpenAI
import os
from dotenv import load_dotenv
# 加载 .env 中的敏感配置(API Key 等)
load_dotenv()
class RAGService:
def __init__(self):
self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# 使用 OpenAI 兼容接口,方便对接 DeepSeek、通义千问、Ollama 等
self.llm_client = AsyncOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
)
async def _retrieve(self, query: str, top_k: int = 5, min_score: float = 0.4) -> list[dict]:
"""内部检索,过滤低相似度噪声"""
results = vector_db.search(query, top_k=top_k)
return [r for r in results if r["score"] >= min_score]
async def generate_answer(self, query: str, top_k: int = 5, min_score: float = 0.4, stream: bool = False):
"""对外暴露:检索 + 生成"""
# 1. 检索
docs = await self._retrieve(query, top_k, min_score)
if not docs:
return {
"answer": "抱歉,知识库中未找到相关内容,请尝试换个说法。",
"sources": []
}
# 2. 构建增强 Prompt
context = "\n\n".join([
f"[来源 {i+1}]({doc['metadata']['source']}, 块号 {doc['metadata']['chunk_idx']})\n{doc['text']}"
for i, doc in enumerate(docs)
])
prompt = f"""你是一个严谨的知识库助手,请严格遵循以下规则回答:
1. 仅参考【参考资料】回答,资料外的内容不要编造
2. 答案要简洁明了
3. 如果资料中没有明确答案,请直接说「未找到相关信息」
参考资料:
{context}
用户问题:{query}"""
# 3. 调用 LLM 生成
llm_response = await self.llm_client.chat.completions.create(
model=os.getenv("LLM_MODEL", "gpt-4o-mini"),
messages=[{"role": "user", "content": prompt}],
temperature=0.2, # 低温度保证回答稳定
max_tokens=1000,
stream=stream
)
return {"response": llm_response, "sources": docs}
4.2 FastAPI routing (routers/rag.py)
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from services.rag_service import RAGService
from processing.chunker import TextChunker
from vector_store.chroma_store import vector_db
from sentence_transformers import SentenceTransformer
import json
router = APIRouter(prefix="/api/rag", tags=["RAG 接口"])
rag_service = RAGService()
chunker = TextChunker()
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# 请求/响应模型
class QueryReq(BaseModel):
query: str
top_k: int = 5
min_score: float = 0.4
stream: bool = False
class IngestReq(BaseModel):
text: str
source: str = "manual_input"
# ──────────────────────────────────────
# 普通问答接口
# ──────────────────────────────────────
@router.post("/query")
async def rag_query(req: QueryReq):
result = await rag_service.generate_answer(**req.model_dump())
if not req.stream:
return {
"answer": result["response"].choices[0].message.content,
"sources": [
{
"text": d["text"][:200] + "...",
"source": d["metadata"]["source"],
"score": d["score"]
}
for d in result["sources"]
]
}
# ──────────────────────────────────────
# 流式问答接口(打字机效果)
# ──────────────────────────────────────
@router.post("/query/stream")
async def rag_query_stream(req: QueryReq):
result = await rag_service.generate_answer(**req.model_dump())
if not result["sources"]:
return {"answer": "未找到相关内容", "sources": []}
async def event_generator():
# 先推送引用来源
yield f"data: {json.dumps({'type': 'sources', 'content': [d['metadata'] for d in result['sources']]})}\n\n"
# 逐 token 推送
async for chunk in result["response"]:
if token := chunk.choices[0].delta.content:
yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
# 结束标记
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
# ──────────────────────────────────────
# 文档手动入库接口(开发测试用)
# ──────────────────────────────────────
@router.post("/ingest")
async def ingest(req: IngestReq):
chunks = chunker.chunk_text(req.text, req.source)
for chunk in chunks:
vector_db.add_doc(**chunk)
return {"status": "success", "chunks_count": len(chunks)}
5. Quick start test
- Organize the code structure and create it in the project root directory
.envdocument:
OPENAI_API_KEY=your_actual_key
OPENAI_BASE_URL=https://api.deepseek.com/v1 # 可换成你自己的兼容接口
LLM_MODEL=deepseek-chat
- Start FastAPI:
pip install python-multipart fastapi uvicorn python-dotenv
uvicorn main:app --reload
- Open the browser to access
http://localhost:8000/docs, use Swagger UI to directly debug all interfaces!
6. Summary & Optimization Direction
✅ Ability Completed
A complete RAG backend system was built from 0 to 1:
- A vector library package that can be used for both prototyping and upgrading to production environments
- A universal paragraph priority + sliding window chunker
- A RAG Q&A interface that supports citing sources and streaming output
🚀 Subsequent optimization direction (leaving holes to be filled)
- Block Optimization: Separate processing by title level, tables/pictures
- Search enhancement: Introduce Cohere or BGE-Reranker for reordering
- Asynchronous library: Use Celery to handle large file uploads to avoid blocking the interface
- Chinese Adaptation: Replace with
shibing624/text2vec-base-chineseWaiting for Chinese-specific embedding models
🔗 Extended reading