LangChain 项目实战
LangChain 项目实战
项目概述
经过前面八个章节的学习,我们已经掌握了 LangChain 的核心概念、模型调用、提示词工程、输出解析、文档加载、向量存储、链式调用(LCEL)、记忆机制、检索增强生成(RAG)以及 Agent 智能代理等关键技能。本章将把这些知识融会贯通,从零构建一个完整的 智能知识库助手(Smart Knowledge Base Assistant)项目。
项目目标
构建一个基于 RAG + Agent 架构的智能知识库助手,具备以下核心能力:
- 文档导入:支持 PDF、Markdown、TXT 等格式的文档批量导入与自动解析
- 智能问答:基于私有文档进行精准问答,并提供来源引用
- 主题摘要:自动提取文档主题并生成摘要
- 多轮对话:支持上下文关联的多轮对话,具备记忆能力
- 工具调用:Agent 可自主选择工具完成任务
- 流式响应:通过 API 提供 Server-Sent Events(SSE)流式输出
技术栈总览
| 层级 | 技术选型 | 说明 |
|---|---|---|
| LLM | OpenAI GPT-4o-mini | 可替换为任意兼容 OpenAI 接口的模型 |
| 框架 | LangChain 0.3+ | 核心编排框架 |
| 向量数据库 | Chroma | 轻量级本地向量存储,适合开发和小规模部署 |
| 嵌入模型 | OpenAIEmbeddings | 文本向量化 |
| 会话存储 | Redis | 对话历史持久化 |
| Web 框架 | FastAPI | 高性能异步 API 服务 |
| 部署 | Docker + Docker Compose | 容器化部署 |
[!tip] 模型选择建议
本项目默认使用 OpenAI GPT-4o-mini 作为演示模型。在生产环境中,你可以通过修改配置无缝切换到其他模型提供商,例如使用langchain_community.chat_models中的ChatZhipuAI(智谱)、ChatQwen(通义千问)等国内模型,只需确保模型接口兼容 OpenAI 格式即可。
需求分析
功能需求
根据项目目标,将功能拆解为以下模块:
| 模块 | 功能点 | 优先级 |
|---|---|---|
| 文档处理 | 支持 PDF/MD/TXT 文件上传和解析 | P0 |
| 文档处理 | 自动分块与向量化存储 | P0 |
| 智能问答 | 基于检索结果的精准问答 | P0 |
| 智能问答 | 答案附带来源文档引用 | P0 |
| 主题摘要 | 对指定主题生成摘要 | P1 |
| 主题管理 | 列出知识库中的所有主题 | P1 |
| 对话管理 | 多轮对话历史记录 | P0 |
| 对话管理 | 对话上下文关联 | P1 |
| Agent 调度 | 自动选择工具完成任务 | P0 |
| API 服务 | RESTful API 接口 | P0 |
| API 服务 | SSE 流式响应 | P1 |
技术架构
下面是系统整体的技术架构图,展示了各模块之间的数据流向和依赖关系:
数据流分析
用户发起一次问答的完整数据流如下:
项目结构设计
目录结构
smart-knowledge-assistant/
|
|-- config.py # 配置管理(Pydantic Settings)
|-- main.py # 应用入口,整合所有模块
|-- requirements.txt # Python 依赖
|-- .env # 环境变量(不纳入版本控制)
|-- .env.example # 环境变量模板
|
|-- document_processor.py # 文档处理管道
|-- qa_module.py # RAG 问答模块
|-- agent_tools.py # Agent 工具定义
|-- conversation_manager.py # 对话管理模块
|-- api_server.py # FastAPI 服务层
|
|-- tests/
| |-- __init__.py
| |-- test_document_processor.py
| |-- test_qa_module.py
| |-- test_agent_tools.py
| |-- test_conversation_manager.py
| |-- test_api_server.py
|
|-- data/
| |-- uploads/ # 上传的原始文档
| |-- chroma_db/ # Chroma 向量数据库持久化目录
|
|-- Dockerfile # 容器构建文件
|-- docker-compose.yml # 容器编排文件
模块职责说明
| 模块 | 文件 | 职责 |
|---|---|---|
| 配置管理 | config.py | 集中管理所有配置项,支持环境变量覆盖 |
| 文档处理 | document_processor.py | 文档加载、分块、向量化、存储 |
| 问答模块 | qa_module.py | 检索相关文档、生成答案、附带来源 |
| Agent 工具 | agent_tools.py | 定义可供 Agent 调用的工具集 |
| 对话管理 | conversation_manager.py | 管理对话历史,基于 Redis 持久化 |
| API 服务 | api_server.py | HTTP 接口、请求路由、流式响应 |
| 应用入口 | main.py | 初始化所有模块、启动服务 |
[!warning] 模块依赖顺序
模块之间存在明确的依赖关系:config.py是最底层模块,被所有其他模块依赖;document_processor.py和conversation_manager.py依赖于config.py;qa_module.py依赖于document_processor.py;agent_tools.py依赖于qa_module.py;api_server.py依赖于所有模块。在开发时应按照此顺序逐步实现。
配置管理模块
配置管理是项目的基础设施。我们使用 Pydantic 的 BaseSettings 来实现类型安全的配置管理,支持通过环境变量和 .env 文件灵活配置。
# config.py
from pydantic_settings import BaseSettings
from pydantic import Field
from functools import lru_cache
from typing import Optional
class Settings(BaseSettings):
"""应用配置类,支持环境变量和 .env 文件"""
# ---- LLM 配置 ----
llm_model_name: str = Field(
default="gpt-4o-mini",
description="LLM 模型名称",
)
llm_temperature: float = Field(
default=0.7,
ge=0.0,
le=2.0,
description="LLM 温度参数,控制输出随机性",
)
llm_max_tokens: int = Field(
default=2048,
gt=0,
description="LLM 最大输出 token 数",
)
openai_api_key: str = Field(
...,
description="OpenAI API Key(必填)",
)
openai_api_base: Optional[str] = Field(
default=None,
description="OpenAI API 基础地址,用于代理或兼容接口",
)
# ---- 嵌入模型配置 ----
embedding_model_name: str = Field(
default="text-embedding-3-small",
description="嵌入模型名称",
)
# ---- 向量存储配置 ----
chroma_persist_directory: str = Field(
default="./data/chroma_db",
description="Chroma 向量数据库持久化目录",
)
chroma_collection_name: str = Field(
default="knowledge_base",
description="Chroma 集合名称",
)
# ---- 文档处理配置 ----
chunk_size: int = Field(
default=500,
gt=0,
description="文档分块大小(字符数)",
)
chunk_overlap: int = Field(
default=50,
ge=0,
description="文档分块重叠字符数",
)
upload_directory: str = Field(
default="./data/uploads",
description="上传文件存储目录",
)
# ---- 检索配置 ----
retrieval_top_k: int = Field(
default=4,
gt=0,
description="检索返回的文档片段数量",
)
# ---- Redis 配置 ----
redis_url: str = Field(
default="redis://localhost:6379/0",
description="Redis 连接地址",
)
conversation_ttl: int = Field(
default=3600,
gt=0,
description="对话历史过期时间(秒),默认 1 小时",
)
# ---- API 服务配置 ----
api_host: str = Field(
default="0.0.0.0",
description="API 服务监听地址",
)
api_port: int = Field(
default=8000,
gt=0,
le=65535,
description="API 服务监听端口",
)
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"case_sensitive": False,
}
@lru_cache()
def get_settings() -> Settings:
"""获取全局配置单例"""
return Settings()
对应的 .env.example 模板文件:
# .env.example
# 复制此文件为 .env 并填写实际值
# LLM 配置
OPENAI_API_KEY=sk-your-api-key-here
OPENAI_API_BASE=https://api.openai.com/v1
LLM_MODEL_NAME=gpt-4o-mini
LLM_TEMPERATURE=0.7
LLM_MAX_TOKENS=2048
# 嵌入模型
EMBEDDING_MODEL_NAME=text-embedding-3-small
# 向量存储
CHROMA_PERSIST_DIRECTORY=./data/chroma_db
CHROMA_COLLECTION_NAME=knowledge_base
# 文档处理
CHUNK_SIZE=500
CHUNK_OVERLAP=50
UPLOAD_DIRECTORY=./data/uploads
# 检索
RETRIEVAL_TOP_K=4
# Redis
REDIS_URL=redis://localhost:6379/0
CONVERSATION_TTL=3600
# API 服务
API_HOST=0.0.0.0
API_PORT=8000
配置项一览表:
| 配置项 | 环境变量 | 默认值 | 说明 |
|---|---|---|---|
llm_model_name | LLM_MODEL_NAME | gpt-4o-mini | 使用的 LLM 模型 |
llm_temperature | LLM_TEMPERATURE | 0.7 | 温度参数 |
llm_max_tokens | LLM_MAX_TOKENS | 2048 | 最大输出 token |
openai_api_key | OPENAI_API_KEY | (必填) | API 密钥 |
openai_api_base | OPENAI_API_BASE | None | API 基础地址 |
embedding_model_name | EMBEDDING_MODEL_NAME | text-embedding-3-small | 嵌入模型 |
chroma_persist_directory | CHROMA_PERSIST_DIRECTORY | ./data/chroma_db | Chroma 持久化路径 |
retrieval_top_k | RETRIEVAL_TOP_K | 4 | 检索返回数量 |
redis_url | REDIS_URL | redis://localhost:6379/0 | Redis 地址 |
conversation_ttl | CONVERSATION_TTL | 3600 | 对话过期秒数 |
[!tip] 使用 lru_cache 实现单例
get_settings()函数使用@lru_cache()装饰器,确保整个应用生命周期内只创建一次Settings实例。这在 FastAPI 中通过依赖注入使用时非常方便,既保证了配置的一致性,又避免了重复读取环境变量的开销。
文档处理管道
文档处理管道是 RAG 系统的基础,负责将原始文档转化为可检索的向量数据。整个处理流程分为四个阶段:加载(Load)、分块(Split)、嵌入(Embed)、存储(Store)。
# document_processor.py
import os
from pathlib import Path
from typing import List, Optional
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
UnstructuredMarkdownLoader,
)
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from config import get_settings
class DocumentProcessor:
"""文档处理管道:加载 -> 分块 -> 嵌入 -> 存储"""
# 支持的文件类型与对应加载器的映射
SUPPORTED_EXTENSIONS = {
".pdf": "pdf",
".md": "markdown",
".txt": "text",
}
def __init__(self):
settings = get_settings()
self.chunk_size = settings.chunk_size
self.chunk_overlap = settings.chunk_overlap
self.upload_dir = Path(settings.upload_directory)
self.persist_directory = settings.chroma_persist_directory
self.collection_name = settings.chroma_collection_name
# 初始化嵌入模型
embedding_kwargs = {"model": settings.embedding_model_name}
if settings.openai_api_key:
embedding_kwargs["api_key"] = settings.openai_api_key
if settings.openai_api_base:
embedding_kwargs["base_url"] = settings.openai_api_base
self.embeddings = OpenAIEmbeddings(**embedding_kwargs)
# 初始化文本分块器
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""],
length_function=len,
)
# 确保目录存在
self.upload_dir.mkdir(parents=True, exist_ok=True)
def _get_vectorstore(self) -> Chroma:
"""获取或创建 Chroma 向量存储实例"""
return Chroma(
collection_name=self.collection_name,
embedding_function=self.embeddings,
persist_directory=self.persist_directory,
)
def _load_file(self, file_path: Path) -> List[Document]:
"""根据文件类型选择对应的加载器"""
ext = file_path.suffix.lower()
file_type = self.SUPPORTED_EXTENSIONS.get(ext)
if file_type == "pdf":
loader = PyPDFLoader(str(file_path))
elif file_type == "markdown":
loader = UnstructuredMarkdownLoader(str(file_path))
elif file_type == "text":
loader = TextLoader(str(file_path), encoding="utf-8")
else:
raise ValueError(
f"不支持的文件类型: {ext},"
f"支持的类型: {list(self.SUPPORTED_EXTENSIONS.keys())}"
)
documents = loader.load()
# 为每个文档添加来源元数据
for doc in documents:
doc.metadata["source_file"] = file_path.name
doc.metadata["file_type"] = file_type
return documents
def _split_documents(self, documents: List[Document]) -> List[Document]:
"""将文档列表进行分块处理"""
chunks = self.text_splitter.split_documents(documents)
# 为每个分块添加序号元数据
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_index"] = i
return chunks
def process_file(self, file_path: str) -> dict:
"""
处理单个文件的完整流程。
Args:
file_path: 文件路径
Returns:
处理结果摘要字典
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
# 第一步:加载文档
documents = self._load_file(path)
# 第二步:分块
chunks = self._split_documents(documents)
# 第三步和第四步:嵌入并存储到 Chroma
vectorstore = self._get_vectorstore()
vectorstore.add_documents(chunks)
return {
"file_name": path.name,
"total_documents": len(documents),
"total_chunks": len(chunks),
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
}
def process_directory(self, dir_path: Optional[str] = None) -> List[dict]:
"""
批量处理目录中的所有支持文件。
Args:
dir_path: 目录路径,默认使用上传目录
Returns:
所有文件的处理结果列表
"""
target_dir = Path(dir_path) if dir_path else self.upload_dir
results = []
for file_path in target_dir.rglob("*"):
if file_path.suffix.lower() in self.SUPPORTED_EXTENSIONS:
try:
result = self.process_file(str(file_path))
results.append(result)
except Exception as e:
results.append({
"file_name": file_path.name,
"error": str(e),
})
return results
def get_retriever(self, top_k: Optional[int] = None):
"""获取检索器实例"""
settings = get_settings()
vectorstore = self._get_vectorstore()
return vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": top_k or settings.retrieval_top_k,
"fetch_k": (top_k or settings.retrieval_top_k) * 2,
},
)
[!warning] 分块策略的重要性
文档分块直接影响 RAG 的效果。分块太大,检索结果中会包含大量无关内容;分块太小,又会丢失上下文信息。建议从 500 字符起步,根据实际效果调整。separators参数中的中文标点符号(。、!、?)确保了中文文本在自然断句处分块,避免将一个完整的句子拆分到两个块中。
RAG 问答模块
RAG 问答模块是系统的核心,负责根据用户问题检索相关文档片段,然后交给 LLM 生成带有来源引用的答案。
# qa_module.py
from typing import List, Dict, Any, Optional
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from config import get_settings
from document_processor import DocumentProcessor
# RAG 问答的提示词模板
QA_PROMPT_TEMPLATE = """你是一个专业的知识库问答助手。请根据以下检索到的文档内容来回答用户的问题。
要求:
1. 只根据提供的文档内容回答,不要编造信息
2. 如果文档中没有相关信息,明确告知用户
3. 在答案末尾附上引用的来源文件名
4. 使用清晰、专业的语言
检索到的文档内容:
{context}
用户问题:{question}
请给出详细的回答:"""
class QAModule:
"""RAG 问答模块:检索 + 生成"""
def __init__(self, doc_processor: DocumentProcessor):
settings = get_settings()
self.doc_processor = doc_processor
# 初始化 LLM
llm_kwargs = {
"model": settings.llm_model_name,
"temperature": settings.llm_temperature,
"max_tokens": settings.llm_max_tokens,
"api_key": settings.openai_api_key,
}
if settings.openai_api_base:
llm_kwargs["base_url"] = settings.openai_api_base
self.llm = ChatOpenAI(**llm_kwargs)
# 构建问答链
self._build_chain()
def _build_chain(self):
"""构建 RAG 问答链"""
prompt = ChatPromptTemplate.from_messages([
("system", QA_PROMPT_TEMPLATE),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
])
# 使用 LCEL 构建链
self.chain = (
RunnablePassthrough.assign(
context=lambda x: self._retrieve_context(x["question"]),
)
| prompt
| self.llm
| StrOutputParser()
)
def _retrieve_context(self, question: str) -> str:
"""检索相关文档并格式化为上下文"""
retriever = self.doc_processor.get_retriever()
docs: List[Document] = retriever.invoke(question)
return self._format_docs(docs)
def _format_docs(self, docs: List[Document]) -> str:
"""将文档列表格式化为字符串,附带来源信息"""
formatted_parts = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source_file", "未知来源")
chunk_idx = doc.metadata.get("chunk_index", "?")
formatted_parts.append(
f"[文档片段 {i}] (来源: {source}, 片段序号: {chunk_idx})\n"
f"{doc.page_content}"
)
return "\n\n".join(formatted_parts)
def ask(
self,
question: str,
chat_history: Optional[List] = None,
) -> Dict[str, Any]:
"""
执行问答。
Args:
question: 用户问题
chat_history: 对话历史消息列表
Returns:
包含答案和来源的字典
"""
chat_history = chat_history or []
# 检索相关文档(保留引用信息)
retriever = self.doc_processor.get_retriever()
docs: List[Document] = retriever.invoke(question)
context = self._format_docs(docs)
# 生成答案
answer = self.chain.invoke({
"question": question,
"context": context,
"chat_history": chat_history,
})
# 收集来源信息
sources = list({
doc.metadata.get("source_file", "未知来源")
for doc in docs
})
return {
"answer": answer,
"sources": sources,
"num_docs_retrieved": len(docs),
}
def ask_stream(
self,
question: str,
chat_history: Optional[List] = None,
):
"""
流式问答,逐步返回答案。
Args:
question: 用户问题
chat_history: 对话历史消息列表
Yields:
答案的文本片段
"""
chat_history = chat_history or []
retriever = self.doc_processor.get_retriever()
docs: List[Document] = retriever.invoke(question)
context = self._format_docs(docs)
for chunk in self.chain.stream({
"question": question,
"context": context,
"chat_history": chat_history,
}):
yield chunk
[!tip] MMR 检索策略
代码中使用了 MMR(Maximal Marginal Relevance)检索策略,它的优势在于既能保证检索结果与查询的相关性,又能尽量减少结果之间的冗余。fetch_k参数控制先检索多少候选文档,再从中选出k个最不重复的结果。这对于知识库中包含大量重复信息的场景特别有效。
Agent 工具模块
Agent 的强大之处在于它能自主选择合适的工具来完成任务。本节我们定义三个核心工具,让 Agent 具备文档检索、主题摘要和主题列举的能力。
# agent_tools.py
from typing import List, Optional
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from config import get_settings
from qa_module import QAModule
@tool
def search_docs(query: str) -> str:
"""
在知识库中搜索与查询相关的文档内容。
当用户提出具体问题,需要从知识库中查找答案时使用此工具。
输入应该是清晰、具体的查询语句。
Args:
query: 搜索查询语句
Returns:
检索到的相关文档内容和答案
"""
settings = get_settings()
doc_processor_kwargs = {}
qa = QAModule.from_settings(settings)
result = qa.ask(query)
return result["answer"]
@tool
def summarize_topic(topic: str) -> str:
"""
对知识库中的指定主题生成摘要。
当用户需要了解某个主题的概览或总结时使用此工具。
工具会先检索与该主题相关的所有文档,然后生成摘要。
Args:
topic: 需要摘要的主题关键词
Returns:
主题摘要文本
"""
settings = get_settings()
from document_processor import DocumentProcessor
doc_processor = DocumentProcessor()
retriever = doc_processor.get_retriever(top_k=8)
docs = retriever.invoke(topic)
if not docs:
return f"知识库中未找到与「{topic}」相关的内容。"
# 拼接检索到的内容
context = "\n\n".join([
f"[来源: {doc.metadata.get('source_file', '未知')}]\n{doc.page_content}"
for doc in docs
])
summary_prompt = ChatPromptTemplate.from_messages([
("system", """请根据以下文档内容,生成关于「{topic}」的综合性摘要。
要求:
1. 提取关键信息和核心观点
2. 组织成结构清晰的段落
3. 如果有多个方面的内容,分点说明
4. 保持客观中立的语气
文档内容:
{context}"""),
("human", "请生成关于「{topic}」的摘要。"),
])
llm_kwargs = {
"model": settings.llm_model_name,
"temperature": 0.3,
"api_key": settings.openai_api_key,
}
if settings.openai_api_base:
llm_kwargs["base_url"] = settings.openai_api_base
llm = ChatOpenAI(**llm_kwargs)
chain = summary_prompt | llm | StrOutputParser()
summary = chain.invoke({"topic": topic, "context": context})
sources = list({doc.metadata.get("source_file", "未知") for doc in docs})
return f"{summary}\n\n---\n参考来源: {', '.join(sources)}"
@tool
def list_topics() -> str:
"""
列出知识库中所有可用的主题/文件。
当用户想知道知识库中有什么内容,或者想浏览可用的文档时使用此工具。
Returns:
知识库中的主题和文件列表
"""
from document_processor import DocumentProcessor
from langchain_chroma import Chroma
settings = get_settings()
doc_processor = DocumentProcessor()
vectorstore = doc_processor._get_vectorstore()
# 获取集合中的所有文档元数据
collection = vectorstore._collection
all_metadata = collection.get(include=["metadatas"])
if not all_metadata["metadatas"]:
return "知识库当前为空,请先上传文档。"
# 统计每个文件的分块数量
file_stats = {}
for meta in all_metadata["metadatas"]:
if meta:
file_name = meta.get("source_file", "未知文件")
file_type = meta.get("file_type", "未知类型")
key = (file_name, file_type)
file_stats[key] = file_stats.get(key, 0) + 1
# 格式化输出
result_lines = ["知识库包含以下文档:\n"]
total_chunks = 0
for (file_name, file_type), count in sorted(file_stats.items()):
result_lines.append(f" - {file_name} (类型: {file_type}, 分块数: {count})")
total_chunks += count
result_lines.append(f"\n共 {len(file_stats)} 个文件,{total_chunks} 个文档片段。")
return "\n".join(result_lines)
# 导出所有工具的列表,供 Agent 使用
ALL_TOOLS = [search_docs, summarize_topic, list_topics]
工具注册一览:
| 工具名称 | 函数 | 触发场景 | 输入 | 输出 |
|---|---|---|---|---|
search_docs | 文档检索问答 | 用户提出具体问题 | 查询字符串 | 答案 + 来源 |
summarize_topic | 主题摘要 | 用户需要某主题概览 | 主题关键词 | 结构化摘要 |
list_topics | 列出主题 | 用户浏览知识库内容 | 无 | 文件列表统计 |
[!tip] 工具描述的重要性
Agent 的工具选择完全依赖工具的description。描述越清晰、越具体,Agent 就越能准确地选择合适的工具。每个工具描述都应该明确说明:这个工具做什么、什么情况下使用、输入是什么、输出是什么。@tool装饰器中的 docstring 会被自动提取为工具描述。
对话管理模块
对话管理模块负责维护多轮对话的上下文,基于 Redis 实现会话持久化。这确保了 Agent 在处理多轮对话时能够理解上下文关联的问题。
# conversation_manager.py
import json
from typing import List, Optional
from datetime import timedelta
import redis
from langchain_core.messages import (
BaseMessage,
HumanMessage,
AIMessage,
SystemMessage,
messages_from_dict,
messages_to_dict,
)
from config import get_settings
class ConversationManager:
"""基于 Redis 的对话历史管理器"""
KEY_PREFIX = "conversation:"
def __init__(self):
settings = get_settings()
self.redis_url = settings.redis_url
self.ttl = settings.conversation_ttl
self._client: Optional[redis.Redis] = None
@property
def client(self) -> redis.Redis:
"""懒加载 Redis 客户端"""
if self._client is None:
self._client = redis.from_url(
self.redis_url,
decode_responses=True,
)
return self._client
def _make_key(self, session_id: str) -> str:
"""生成 Redis 存储键"""
return f"{self.KEY_PREFIX}{session_id}"
def get_history(
self,
session_id: str,
limit: Optional[int] = None,
) -> List[BaseMessage]:
"""
获取指定会话的对话历史。
Args:
session_id: 会话标识
limit: 最多返回的消息数量(None 表示全部)
Returns:
消息对象列表
"""
key = self._make_key(session_id)
raw_list = self.client.lrange(key, 0, -1)
if not raw_list:
return []
messages = []
for raw in raw_list:
try:
msg_dict = json.loads(raw)
msg_type = msg_dict.get("type", "human")
content = msg_dict.get("content", "")
if msg_type == "human":
messages.append(HumanMessage(content=content))
elif msg_type == "ai":
messages.append(AIMessage(content=content))
elif msg_type == "system":
messages.append(SystemMessage(content=content))
except (json.JSONDecodeError, KeyError):
continue
# 限制返回数量(保留最近的消息)
if limit and len(messages) > limit:
messages = messages[-limit:]
return messages
def add_message(
self,
session_id: str,
role: str,
content: str,
) -> None:
"""
向会话中添加一条消息。
Args:
session_id: 会话标识
role: 消息角色(human / ai / system)
content: 消息内容
"""
key = self._make_key(session_id)
msg_data = json.dumps({
"type": role,
"content": content,
}, ensure_ascii=False)
# 添加到列表末尾
self.client.rpush(key, msg_data)
# 刷新过期时间
self.client.expire(key, timedelta(seconds=self.ttl))
def add_exchange(
self,
session_id: str,
human_message: str,
ai_message: str,
) -> None:
"""
添加一轮对话(用户消息 + AI 回复)。
Args:
session_id: 会话标识
human_message: 用户消息内容
ai_message: AI 回复内容
"""
self.add_message(session_id, "human", human_message)
self.add_message(session_id, "ai", ai_message)
def clear_history(self, session_id: str) -> None:
"""清除指定会话的所有历史记录"""
key = self._make_key(session_id)
self.client.delete(key)
def get_session_ids(self) -> List[str]:
"""获取所有活跃的会话 ID"""
pattern = f"{self.KEY_PREFIX}*"
keys = self.client.keys(pattern)
return [
key.replace(self.KEY_PREFIX, "")
for key in keys
]
对话管理的核心流程:
[!warning] Redis 连接管理
代码中使用懒加载模式创建 Redis 客户端,避免在模块导入时就尝试连接 Redis。在生产环境中,建议使用连接池管理 Redis 连接,并添加重试和超时机制。如果 Redis 不可用,系统应该能够降级到内存存储模式,而不是直接崩溃。
API 服务层
API 服务层将所有模块整合到 FastAPI 应用中,提供 RESTful 接口和 SSE 流式响应。
# api_server.py
import uuid
from typing import Optional, List
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from config import get_settings
from document_processor import DocumentProcessor
from qa_module import QAModule
from agent_tools import ALL_TOOLS
from conversation_manager import ConversationManager
# ---- 请求/响应模型 ----
class ChatRequest(BaseModel):
"""对话请求"""
question: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
"""对话响应"""
answer: str
sources: List[str]
session_id: str
class UploadResponse(BaseModel):
"""文件上传响应"""
file_name: str
total_chunks: int
message: str
# ---- 创建 FastAPI 应用 ----
def create_app() -> FastAPI:
"""创建并配置 FastAPI 应用实例"""
settings = get_settings()
app = FastAPI(
title="智能知识库助手 API",
description="基于 LangChain 的 RAG + Agent 智能知识库助手",
version="1.0.0",
)
# CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化各模块(通过 app.state 共享)
@app.on_event("startup")
async def startup():
app.state.doc_processor = DocumentProcessor()
app.state.qa_module = QAModule(app.state.doc_processor)
app.state.conversation_mgr = ConversationManager()
app.state.settings = settings
# ---- 路由定义 ----
@app.get("/api/health")
async def health_check():
"""健康检查接口"""
return {"status": "ok", "service": "smart-knowledge-assistant"}
@app.post("/api/upload", response_model=UploadResponse)
async def upload_document(file: UploadFile = File(...)):
"""
上传文档到知识库。
支持 PDF、Markdown、TXT 格式。
"""
# 验证文件类型
ext = "." + file.filename.rsplit(".", 1)[-1].lower()
if ext not in DocumentProcessor.SUPPORTED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=(
f"不支持的文件类型: {ext},"
f"支持的类型: {list(DocumentProcessor.SUPPORTED_EXTENSIONS.keys())}"
),
)
# 保存文件
upload_dir = Path(settings.upload_directory)
upload_dir.mkdir(parents=True, exist_ok=True)
file_path = upload_dir / file.filename
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# 处理文档
try:
result = app.state.doc_processor.process_file(str(file_path))
return UploadResponse(
file_name=result["file_name"],
total_chunks=result["total_chunks"],
message="文档处理成功",
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"文档处理失败: {str(e)}")
@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
对话接口:基于 RAG 进行问答。
如果 session_id 为空,将创建新会话。
"""
session_id = request.session_id or str(uuid.uuid4())
# 加载对话历史
history = app.state.conversation_mgr.get_history(session_id)
# 执行问答
result = app.state.qa_module.ask(
question=request.question,
chat_history=history,
)
# 保存对话
app.state.conversation_mgr.add_exchange(
session_id=session_id,
human_message=request.question,
ai_message=result["answer"],
)
return ChatResponse(
answer=result["answer"],
sources=result["sources"],
session_id=session_id,
)
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
"""
流式对话接口:使用 SSE 逐步返回答案。
"""
import json
session_id = request.session_id or str(uuid.uuid4())
history = app.state.conversation_mgr.get_history(session_id)
def event_generator():
full_answer = ""
for chunk in app.state.qa_module.ask_stream(
question=request.question,
chat_history=history,
):
full_answer += chunk
yield f"data: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
# 保存完整对话
app.state.conversation_mgr.add_exchange(
session_id=session_id,
human_message=request.question,
ai_message=full_answer,
)
# 发送结束标记
yield f"data: {json.dumps({'done': True, 'session_id': session_id}, ensure_ascii=False)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.delete("/api/conversation/{session_id}")
async def clear_conversation(session_id: str):
"""清除指定会话的对话历史"""
app.state.conversation_mgr.clear_history(session_id)
return {"message": f"会话 {session_id} 已清除"}
return app
# 需要 Path 导入
from pathlib import Path
API 接口一览:
| 方法 | 路径 | 功能 | 请求体 | 响应 |
|---|---|---|---|---|
GET | /api/health | 健康检查 | 无 | {"status": "ok"} |
POST | /api/upload | 上传文档 | multipart/form-data | 文件名、分块数 |
POST | /api/chat | 问答对话 | {"question", "session_id"} | 答案、来源、会话ID |
POST | /api/chat/stream | 流式问答 | {"question", "session_id"} | SSE 事件流 |
DELETE | /api/conversation/{id} | 清除历史 | 无 | 操作确认 |
完整代码整合
main.py 是应用的入口文件,负责初始化所有模块并启动服务。它将前面定义的所有模块串联在一起,形成一个完整可运行的应用。
# main.py
"""
智能知识库助手 - 应用入口
使用方法:
# 启动服务(开发模式)
python main.py
# 启动服务(生产模式,使用 uvicorn)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# 初始化知识库(导入已有文档)
python main.py --init ./data/uploads
"""
import argparse
import sys
import uvicorn
from config import get_settings
from document_processor import DocumentProcessor
from api_server import create_app
def init_knowledge_base(doc_dir: str) -> None:
"""初始化知识库:批量导入文档"""
print(f"正在初始化知识库,扫描目录: {doc_dir}")
processor = DocumentProcessor()
results = processor.process_directory(doc_dir)
success_count = 0
for result in results:
if "error" in result:
print(f" [失败] {result['file_name']}: {result['error']}")
else:
print(
f" [成功] {result['file_name']} "
f"- {result['total_chunks']} 个分块"
)
success_count += 1
print(f"\n初始化完成: {success_count}/{len(results)} 个文件处理成功")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="智能知识库助手")
parser.add_argument(
"--init",
type=str,
help="初始化知识库,指定文档目录路径",
)
parser.add_argument(
"--host",
type=str,
default=None,
help="服务监听地址(覆盖配置文件)",
)
parser.add_argument(
"--port",
type=int,
default=None,
help="服务监听端口(覆盖配置文件)",
)
args = parser.parse_args()
settings = get_settings()
# 如果指定了初始化,执行文档导入后退出
if args.init:
init_knowledge_base(args.init)
return
# 创建 FastAPI 应用
app = create_app()
# 启动服务
host = args.host or settings.api_host
port = args.port or settings.api_port
print(f"智能知识库助手启动中...")
print(f" 监听地址: {host}:{port}")
print(f" API 文档: http://{host}:{port}/docs")
print(f" 模型: {settings.llm_model_name}")
uvicorn.run(
"api_server:create_app",
host=host,
port=port,
factory=True,
reload=False,
)
if __name__ == "__main__":
main()
requirements.txt 依赖清单:
# requirements.txt
langchain>=0.3.0
langchain-openai>=0.2.0
langchain-chroma>=0.2.0
langchain-text-splitters>=0.3.0
langchain-community>=0.3.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
fastapi>=0.115.0
uvicorn[standard]>=0.30.0
python-multipart>=0.0.9
redis>=5.0.0
pypdf>=4.0.0
unstructured>=0.15.0
项目整体初始化流程:
[!tip] 工厂模式创建应用
create_app()采用工厂模式设计,这是 FastAPI 的最佳实践。它的好处在于:测试时可以创建独立的应用实例,避免测试之间相互影响;使用uvicorn的factory=True参数可以确保每次启动都经过完整的初始化流程。生产环境中建议使用--workers 4启动多个工作进程以提高并发能力。
测试与调试
单元测试
为每个核心模块编写单元测试,确保各模块功能正确。以下是关键模块的测试示例:
# tests/test_document_processor.py
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from document_processor import DocumentProcessor
@pytest.fixture
def mock_settings():
"""模拟配置"""
with patch("document_processor.get_settings") as mock:
settings = MagicMock()
settings.chunk_size = 200
settings.chunk_overlap = 20
settings.upload_directory = "./test_data/uploads"
settings.chroma_persist_directory = "./test_data/chroma"
settings.chroma_collection_name = "test_collection"
settings.embedding_model_name = "text-embedding-3-small"
settings.openai_api_key = "test-key"
settings.openai_api_base = None
settings.retrieval_top_k = 3
mock.return_value = settings
yield settings
class TestDocumentProcessor:
def test_supported_extensions(self, mock_settings):
"""测试支持的文件类型"""
processor = DocumentProcessor()
assert ".pdf" in processor.SUPPORTED_EXTENSIONS
assert ".md" in processor.SUPPORTED_EXTENSIONS
assert ".txt" in processor.SUPPORTED_EXTENSIONS
assert ".docx" not in processor.SUPPORTED_EXTENSIONS
def test_split_documents(self, mock_settings):
"""测试文档分块"""
processor = DocumentProcessor()
from langchain_core.documents import Document
docs = [Document(
page_content="这是第一段内容。" * 30,
metadata={"source_file": "test.txt"},
)]
chunks = processor._split_documents(docs)
assert len(chunks) > 1
# 每个分块应有元数据
for chunk in chunks:
assert "chunk_index" in chunk.metadata
def test_unsupported_file_type(self, mock_settings):
"""测试不支持的文件类型"""
processor = DocumentProcessor()
with pytest.raises(ValueError, match="不支持的文件类型"):
processor._load_file(Path("test.docx"))
# tests/test_conversation_manager.py
import pytest
from unittest.mock import patch, MagicMock
from conversation_manager import ConversationManager
@pytest.fixture
def mock_redis():
"""模拟 Redis 客户端"""
with patch.object(ConversationManager, "client", new_callable=lambda: MagicMock()) as mock:
yield mock
class TestConversationManager:
def test_add_and_retrieve_message(self):
"""测试消息的添加和获取"""
mgr = ConversationManager()
# 模拟 Redis 操作
mgr._client = MagicMock()
mgr._client.lrange.return_value = [
'{"type": "human", "content": "你好"}',
'{"type": "ai", "content": "你好!有什么可以帮助你的?"}',
]
history = mgr.get_history("test-session")
assert len(history) == 2
assert history[0].content == "你好"
assert history[1].content == "你好!有什么可以帮助你的?"
def test_clear_history(self):
"""测试清除历史"""
mgr = ConversationManager()
mgr._client = MagicMock()
mgr.clear_history("test-session")
mgr._client.delete.assert_called_once()
def test_empty_history(self):
"""测试空会话"""
mgr = ConversationManager()
mgr._client = MagicMock()
mgr._client.lrange.return_value = []
history = mgr.get_history("nonexistent-session")
assert len(history) == 0
集成测试
# tests/test_api_server.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock
@pytest.fixture
def client():
"""创建测试客户端"""
with patch("api_server.get_settings") as mock_settings:
settings = MagicMock()
settings.llm_model_name = "gpt-4o-mini"
settings.llm_temperature = 0.7
settings.llm_max_tokens = 2048
settings.openai_api_key = "test-key"
settings.openai_api_base = None
settings.upload_directory = "./test_uploads"
settings.api_host = "127.0.0.1"
settings.api_port = 8000
mock_settings.return_value = settings
from api_server import create_app
app = create_app()
return TestClient(app)
class TestAPIEndpoints:
def test_health_check(self, client):
"""测试健康检查接口"""
response = client.get("/api/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
def test_upload_unsupported_file(self, client):
"""测试上传不支持的文件类型"""
response = client.post(
"/api/upload",
files={"file": ("test.docx", b"content", "application/octet-stream")},
)
assert response.status_code == 400
测试运行方式
# 运行所有测试
pytest tests/ -v
# 运行指定模块的测试
pytest tests/test_document_processor.py -v
# 运行并查看覆盖率
pytest tests/ -v --cov=. --cov-report=term-missing
# 只运行单元测试(跳过集成测试)
pytest tests/ -v -m "not integration"
[!warning] 测试中的 API 调用
单元测试和集成测试中应避免真实的 LLM API 调用。可以通过unittest.mock.patch模拟 LLM 的返回结果,或者使用环境变量OPENAI_API_KEY=test-key配合 mock 使用。真实的端到端测试应在专门的测试环境中进行,并设置 API 调用频率限制以控制成本。
部署指南
Docker 容器化部署
使用 Docker 将应用容器化,确保开发、测试和生产环境的一致性。
首先编写 Dockerfile:
# Dockerfile
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖(PDF 处理需要)
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目代码
COPY . .
# 创建数据目录
RUN mkdir -p /app/data/uploads /app/data/chroma_db
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "main.py"]
然后编写 docker-compose.yml,将应用和 Redis 编排在一起:
# docker-compose.yml
version: "3.8"
services:
app:
build:
context: .
dockerfile: Dockerfile
ports:
- "${API_PORT:-8000}:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OPENAI_API_BASE=${OPENAI_API_BASE:-}
- LLM_MODEL_NAME=${LLM_MODEL_NAME:-gpt-4o-mini}
- REDIS_URL=redis://redis:6379/0
volumes:
- upload_data:/app/data/uploads
- chroma_data:/app/data/chroma_db
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/api/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
volumes:
upload_data:
chroma_data:
redis_data:
部署架构
部署步骤
# 1. 克隆项目并进入目录
git clone https://github.com/your-org/smart-knowledge-assistant.git
cd smart-knowledge-assistant
# 2. 创建环境变量文件
cp .env.example .env
# 编辑 .env 文件,填写 OPENAI_API_KEY 等必要配置
# 3. 构建并启动服务
docker compose up -d --build
# 4. 查看服务状态
docker compose ps
docker compose logs -f app
# 5. 初始化知识库(可选:导入已有文档)
docker compose exec app python main.py --init /app/data/uploads
# 6. 验证服务
curl http://localhost:8000/api/health
环境配置说明
| 环境 | LLM 模型 | 向量存储 | Redis | 说明 |
|---|---|---|---|---|
| 开发 | gpt-4o-mini | 本地 Chroma | 本地 Docker | 快速迭代,低成本 |
| 测试 | gpt-4o-mini | 内存 Chroma | Mock | 自动化测试,无外部依赖 |
| 生产 | gpt-4o | Chroma / Milvus | Redis Sentinel | 高可用,高性能 |
[!tip] 生产环境优化建议
- 使用 Nginx 作为反向代理,处理 SSL 终止和负载均衡
- 配置 Redis Sentinel 或 Redis Cluster 保证会话存储高可用
- 向量存储考虑从 Chroma 迁移到 Milvus 或 Pinecone 以获得更好的扩展性
- 使用
--workers参数启动多个 uvicorn 工作进程- 配置日志收集和监控告警(如 Prometheus + Grafana)
- 对上传文件大小设置合理限制,防止滥用
本章小结
本章我们完成了一个完整的 LangChain 实战项目 -- 智能知识库助手。从需求分析到部署上线,涵盖了构建一个生产级 RAG + Agent 应用所需的所有关键环节。让我们回顾一下项目的核心要点:
核心技术点回顾
| 知识点 | 对应章节 | 本项目应用 |
|---|---|---|
| 配置管理 | 第 1 章 | Pydantic Settings + .env |
| 核心架构 | 第 2 章 | 模块化设计,各司其职 |
| 模型调用 | 第 3 章 | ChatOpenAI 统一调用 |
| 提示词工程 | 第 3 章 | QA 提示词模板、摘要提示词 |
| 输出解析 | 第 4 章 | StrOutputParser |
| 文档加载 | 第 5 章 | PyPDFLoader、TextLoader |
| 向量存储 | 第 6 章 | Chroma + OpenAIEmbeddings |
| 链式调用 | 第 7 章 | LCEL 管道构建 RAG 链 |
| 记忆机制 | 第 8 章 | Redis 对话历史管理 |
| RAG | 第 8 章 | 检索增强生成,来源引用 |
| Agent | 第 8 章 | 工具定义与自动调度 |
项目特色
- 模块化设计:每个功能模块独立,职责清晰,便于维护和测试
- 完整的 RAG 流程:从文档导入到精准问答,端到端的数据管道
- Agent 工具调用:LLM 自主决策使用哪个工具,实现智能化调度
- 对话上下文管理:基于 Redis 的会话持久化,支持多轮对话
- 流式响应:SSE 实时输出,提升用户体验
- 容器化部署:Docker Compose 一键部署,环境一致
后续优化方向
如果你想继续深入完善这个项目,可以考虑以下优化方向:
- 文档格式扩展:支持 Word(.docx)、Excel(.xlsx)、HTML 等更多格式
- 多模态支持:处理图片、表格等非文本内容
- 混合检索:结合关键词检索和向量检索,提高召回率
- Reranker 重排序:对检索结果进行二次排序,提升相关性
- 用户认证:添加 JWT 认证,支持多用户隔离
- 前端界面:使用 Vue/React 构建友好的 Web 界面
- 评估体系:建立 RAG 评估指标,量化系统效果
- 多语言支持:处理中英文混合的文档和查询
至此,LangChain 系列教程全部完成。从环境搭建到核心概念,从模型调用到项目实战,我们系统地学习了如何使用 LangChain 构建生产级的 AI 应用。希望这些知识能帮助你在实际工作中构建出更强大的智能应用。