链式调用与 LCEL
链式调用与 LCEL
什么是链式调用
核心思想
在实际的 AI 应用中,单一的模型调用往往无法满足复杂的业务需求。我们通常需要将多个步骤串联起来:先格式化提示词,再调用模型,最后解析输出。这种将多个组件按顺序连接起来、前一步的输出作为后一步的输入的工作方式,就是链式调用(Chaining)。
如果说 LLM 是一个强大的「计算单元」,那么链式调用就是将这些计算单元、预处理步骤和后处理步骤组装成完整「流水线」的机制。
为什么需要链式调用
| 需求 | 不使用链式调用 | 使用链式调用 |
|---|---|---|
| 多步骤处理 | 手动调用、手动传参 | 自动传递数据 |
| 代码复用 | 复制粘贴,难以维护 | 组合子链,灵活拼接 |
| 流式输出 | 需要自己实现缓冲和推送 | 原生支持 stream |
| 异步执行 | 手动管理 async/await | 自动异步调度 |
| 并行分支 | 手动开线程或协程 | RunnableParallel 声明式并行 |
| 调试追踪 | 到处加 print | 内置回调与追踪 |
从手动调用到链式调用
下面是一个对比示例,展示手动多步调用与链式调用的差异:
# --- 手动多步调用:步骤多、数据传递靠手写 ---
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o-mini")
# 第一步:格式化提示词
prompt = ChatPromptTemplate.from_template(
"请用简洁的语言解释:{concept}"
)
messages = prompt.invoke({"concept": "量子计算"})
# 第二步:调用模型
response = llm.invoke(messages)
# 第三步:解析输出
parser = StrOutputParser()
result = parser.invoke(response)
print(result)
# --- 链式调用:一行搞定,数据自动传递 ---
chain = prompt | llm | parser
result = chain.invoke({"concept": "量子计算"})
print(result)
[!tip] 链式调用的核心优势
链式调用不仅仅是语法糖。它将组件之间的数据传递、错误处理、流式输出、异步执行等细节统一封装,让开发者专注于业务逻辑本身。
LCEL 基础语法
什么是 LCEL
LCEL(LangChain Expression Language) 是 LangChain 提供的声明式组合语法。它通过管道操作符 | 将多个组件连接起来,构建处理流水线。LCEL 的设计哲学是:任何组件只要实现了 Runnable 接口,就可以用 | 连接。
Runnable 接口
Runnable 是 LCEL 的基础协议。所有支持 LCEL 的组件都实现了这个接口,它提供了一套统一的调用方法:
| 方法 | 说明 | 同步/异步 |
|---|---|---|
invoke(input) | 单次调用,传入单个输入 | 同步 |
batch(inputs) | 批量调用,传入输入列表 | 同步 |
stream(input) | 流式调用,逐步返回结果 | 同步 |
ainvoke(input) | 单次调用(异步版本) | 异步 |
abatch(inputs) | 批量调用(异步版本) | 异步 |
astream(input) | 流式调用(异步版本) | 异步 |
transform(input) | 逐块处理流式输入 | 同步 |
atransform(input) | 逐块处理流式输入(异步版本) | 异步 |
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("用一句话解释:{topic}")
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
chain = prompt | llm | parser
# 1. 单次调用
result = chain.invoke({"topic": "区块链"})
print("invoke:", result)
# 2. 批量调用
results = chain.batch([
{"topic": "区块链"},
{"topic": "机器学习"},
{"topic": "量子计算"},
])
print("batch:", results)
# 3. 流式调用
for chunk in chain.stream({"topic": "区块链"}):
print(chunk, end="", flush=True)
管道操作符的本质
| 操作符的背后是 RunnableSequence——它将两个 Runnable 连接成一个新 Runnable:
# 以下两种写法完全等价
# 写法一:使用管道操作符
chain = prompt | llm | parser
# 写法二:显式构造 RunnableSequence
from langchain_core.runnables import RunnableSequence
chain = RunnableSequence(first=prompt, middle=[llm], last=parser)
[!warning] 注意类型匹配
管道连接时,前一个组件的输出类型必须与后一个组件的输入类型兼容。例如ChatPromptTemplate输出ChatPromptValue,ChatOpenAI接收ChatPromptValue,所以它们可以直接用|连接。如果类型不匹配,运行时会抛出异常。
基础链
最经典的链:prompt | llm | parser
这是 LangChain 中最基础、最常用的链式组合:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 1. 创建组件
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的技术顾问,回答要简洁准确。"),
("human", "{question}"),
])
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
parser = StrOutputParser()
# 2. 组装链
chain = prompt | llm | parser
# 3. 调用链
answer = chain.invoke({"question": "Python 中 GIL 是什么?"})
print(answer)
数据流转过程:
输出解析器详解
除了 StrOutputParser 之外,LangChain 还提供了多种输出解析器:
| 解析器 | 功能 | 典型用途 |
|---|---|---|
StrOutputParser | 提取 AIMessage 的纯文本 | 通用文本生成 |
CommaSeparatedListOutputParser | 解析逗号分隔的列表 | 关键词提取 |
JsonOutputParser | 解析 JSON 格式输出 | 结构化数据提取 |
PydanticOutputParser | 解析为 Pydantic 模型 | 严格类型校验 |
from langchain_core.output_parsers import (
StrOutputParser,
CommaSeparatedListOutputParser,
JsonOutputParser,
)
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
llm = ChatOpenAI(model="gpt-4o-mini")
# 示例1:列表解析器
list_parser = CommaSeparatedListOutputParser()
list_prompt = ChatPromptTemplate.from_template(
"列出5个{category}相关的技术关键词。\n{format_instructions}"
)
list_chain = list_prompt | llm | list_parser
keywords = list_chain.invoke({
"category": "前端开发",
"format_instructions": list_parser.get_format_instructions(),
})
print(type(keywords)) # <class 'list'>
print(keywords) # ['Vue.js', 'React', 'TypeScript', ...]
# 示例2:JSON 解析器
class BookInfo(BaseModel):
title: str = Field(description="书名")
author: str = Field(description="作者")
year: int = Field(description="出版年份")
summary: str = Field(description="简介")
json_parser = JsonOutputParser(pydantic_object=BookInfo)
json_prompt = ChatPromptTemplate.from_template(
"请介绍这本{genre}领域的经典书籍。\n{format_instructions}"
)
json_chain = json_prompt | llm | json_parser
book = json_chain.invoke({
"genre": "编程",
"format_instructions": json_parser.get_format_instructions(),
})
print(type(book)) # <class 'dict'>
print(book["title"], book["author"])
[!tip] 选择解析器的建议
如果只需要纯文本输出,使用StrOutputParser即可。如果需要结构化数据,优先考虑with_structured_output方法(上一章介绍过),它在大多数场景下比PydanticOutputParser更可靠。JsonOutputParser适合需要手动控制 JSON Schema 的场景。
流式输出
链式调用原生支持流式输出,这在聊天场景中尤为重要:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template(
"写一篇关于{topic}的200字短文"
)
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
parser = StrOutputParser()
chain = prompt | llm | parser
# 流式输出 — 逐字打印
print("=== 流式输出 ===")
for chunk in chain.stream({"topic": "人工智能的未来"}):
print(chunk, end="", flush=True)
print()
# 异步流式输出
import asyncio
async def async_stream():
print("=== 异步流式输出 ===")
async for chunk in chain.astream({"topic": "人工智能的未来"}):
print(chunk, end="", flush=True)
print()
asyncio.run(async_stream())
RunnableParallel
并行执行多个分支
RunnableParallel 允许将一个输入同时传递给多个分支并行处理,然后将结果合并为一个字典。这在需要同时获取多种分析结果的场景中非常有用。
使用字典语法的快捷方式
RunnableParallel 有两种等价的写法——使用类构造和使用字典字面量:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# 定义三个子链
summary_prompt = ChatPromptTemplate.from_template(
"用一句话总结以下内容:{text}"
)
keywords_prompt = ChatPromptTemplate.from_template(
"从以下内容中提取5个关键词,用逗号分隔:{text}"
)
sentiment_prompt = ChatPromptTemplate.from_template(
"判断以下内容的情感倾向(正面/负面/中性),只输出一个词:{text}"
)
summary_chain = summary_prompt | llm | parser
keywords_chain = keywords_prompt | llm | parser
sentiment_chain = sentiment_prompt | llm | parser
# 写法一:使用 RunnableParallel 类
parallel = RunnableParallel(
summary=summary_chain,
keywords=keywords_chain,
sentiment=sentiment_chain,
)
# 写法二:使用字典字面量(推荐,更简洁)
parallel = {
"summary": summary_chain,
"keywords": keywords_chain,
"sentiment": sentiment_chain,
}
# 调用
text = "这家新开的咖啡店环境很好,咖啡品质也不错,价格也合理。唯一不足是座位有点少,高峰期需要等位。"
result = parallel.invoke({"text": text})
print("摘要:", result["summary"])
print("关键词:", result["keywords"])
print("情感:", result["sentiment"])
[!warning] RunnableParallel 与字典字面量的区别
在 LCEL 中,当一个字典被放在|管道中时,LangChain 会自动将其转换为RunnableParallel。但如果不在管道中使用,直接调用字典的.invoke()方法会报错。如果需要独立调用,请使用RunnableParallel(...)构造。
在管道中使用并行
RunnableParallel 最常见的用法是嵌入到链的中间,用于准备下一步所需的多个输入:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# 先并行生成「摘要」和「关键词」,再合并交给下一步
analysis_chain = (
RunnableParallel(
summary=ChatPromptTemplate.from_template("用一句话总结:{text}") | llm | parser,
keywords=ChatPromptTemplate.from_template("提取3个关键词,逗号分隔:{text}") | llm | parser,
original=RunnablePassthrough(),
)
| ChatPromptTemplate.from_template(
"基于以下分析结果,写一段综合评价:\n"
"摘要:{summary}\n"
"关键词:{keywords}\n"
"原文:{original}"
)
| llm
| parser
)
result = analysis_chain.invoke({"text": "LangChain 是一个用于构建 LLM 应用的框架,支持链式调用和多种数据源的接入。"})
print(result)
RunnablePassthrough
传递原始输入
RunnablePassthrough 是一个「透传」组件——它将输入原封不动地传递到输出。这个看似简单的组件在实际使用中非常重要,主要用于在 RunnableParallel 中保留原始数据。
from langchain_core.runnables import RunnablePassthrough
# RunnablePassthrough 直接将输入传到输出
passthrough = RunnablePassthrough()
result = passthrough.invoke("Hello")
print(result) # "Hello"
result = passthrough.invoke({"name": "Alice", "age": 30})
print(result) # {"name": "Alice", "age": 30}
在并行链中保留原始字段
当使用 RunnableParallel 时,如果某个分支处理后丢失了原始数据,可以用 RunnablePassthrough 保留它:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
llm = ChatOpenAI(model="gpt-4o-mini")
# 场景:分析用户问题,同时保留原始问题
chain = RunnablePassthrough.assign(
# 保留原始输入,同时新增一个 analysis 字段
analysis=ChatPromptTemplate.from_template(
"分析这个问题的意图和技术领域:{question}"
) | llm | StrOutputParser(),
)
result = chain.invoke({"question": "如何优化 Python 的内存使用?"})
print(result)
# {
# "question": "如何优化 Python 的内存使用?",
# "analysis": "这是一个关于 Python 性能优化的问题,涉及内存管理..."
# }
使用 itemgetter 提取字段
在链式调用中,经常需要从字典中提取特定字段。operator.itemgetter 可以在 LCEL 中直接使用:
from operator import itemgetter
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# 场景:用户输入包含 question 和 context 两个字段
# 链需要分别提取并使用它们
chain = (
RunnablePassthrough.assign(
# 从输入中提取 context,传给模型生成背景知识
background=itemgetter("context")
| ChatPromptTemplate.from_template("简要总结以下背景知识:{context}")
| llm
| parser,
)
| ChatPromptTemplate.from_template(
"基于背景知识:{background}\n\n回答问题:{question}"
)
| llm
| parser
)
result = chain.invoke({
"question": "什么是 RAG?",
"context": "RAG(Retrieval-Augmented Generation)是一种结合了信息检索和文本生成的技术方案。它先从知识库中检索相关文档,再将文档内容作为上下文传递给大语言模型,从而生成更准确的回答。",
})
print(result)
RunnablePassthrough.assign 的典型用法
RunnablePassthrough.assign() 是一个常用方法,它在保留原始输入的同时添加新字段:
from langchain_core.runnables import RunnablePassthrough
# assign 会保留原有字段,并添加新字段
enrich = RunnablePassthrough.assign(
extra_key=lambda x: f"processed: {x['raw_key']}",
)
result = enrich.invoke({"raw_key": "hello", "other": 123})
print(result)
# {"raw_key": "hello", "other": 123, "extra_key": "processed: hello"}
RunnableLambda
将自定义函数嵌入链
不是所有逻辑都需要调用大模型。有时候我们需要在链中插入自定义的 Python 函数,比如数据清洗、格式转换、条件判断等。RunnableLambda 可以将任意 Python 函数包装成 Runnable,从而在 LCEL 管道中使用。
from langchain_core.runnables import RunnableLambda
# 方式一:显式构造
def to_upper(text: str) -> str:
return text.upper()
upper_runnable = RunnableLambda(to_upper)
print(upper_runnable.invoke("hello")) # "HELLO"
# 方式二:使用 @chain 装饰器(推荐)
from langchain_core.runnables import chain
@chain
def format_output(raw_text: str) -> dict:
"""将原始文本格式化为结构化输出"""
word_count = len(raw_text)
return {
"text": raw_text.strip(),
"length": word_count,
"preview": raw_text[:50] + "..." if word_count > 50 else raw_text,
}
result = format_output.invoke(" 这是一段需要处理的文本内容 ")
print(result)
# {"text": "这是一段需要处理的文本内容", "length": 14, "preview": "这是一段需要处理的文本内容"}
在链中使用 RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
llm = ChatOpenAI(model="gpt-4o-mini")
# 自定义处理函数
def clean_and_split(text: str) -> list[str]:
"""清洗并分割文本为列表"""
items = text.strip().split("\n")
return [item.strip().lstrip("0123456789.-) ") for item in items if item.strip()]
def format_as_markdown(items: list[str]) -> str:
"""将列表格式化为 Markdown 列表"""
return "\n".join(f"- {item}" for item in items)
# 组装链
chain = (
ChatPromptTemplate.from_template("列出5个学习{language}的理由,每行一个")
| llm
| StrOutputParser()
| RunnableLambda(clean_and_split) # 拆分为列表
| RunnableLambda(format_as_markdown) # 再格式化
)
result = chain.invoke({"language": "Rust"})
print(result)
# - 内存安全,无需垃圾回收
# - 高性能,零成本抽象
# - 强大的类型系统
# - 优秀的并发支持
# - 活跃的开源社区
[!tip] RunnableLambda 的输入类型
RunnableLambda的函数参数类型取决于链中上一个组件的输出类型。例如,StrOutputParser输出str,所以下游的RunnableLambda函数应该接收str参数。如果上游输出的是dict,则函数应接收dict参数。
带状态的 RunnableLambda
有时候自定义函数需要访问外部状态(如数据库、缓存等):
from langchain_core.runnables import RunnableLambda
class TranslationCache:
"""简单的翻译缓存"""
def __init__(self):
self.cache: dict[str, str] = {}
def lookup(self, text: str) -> str:
"""先查缓存,缓存未命中则标记"""
if text in self.cache:
return f"[cached] {self.cache[text]}"
return text # 返回原文,后续交给 LLM 翻译
def store(self, pair: dict) -> dict:
"""将翻译结果存入缓存"""
original = pair.get("original", "")
translated = pair.get("translated", "")
if original and translated:
self.cache[original] = translated
return pair
cache = TranslationCache()
# 在链中使用
lookup_runnable = RunnableLambda(cache.lookup)
store_runnable = RunnableLambda(cache.store)
条件分支
RunnableBranch 基本用法
RunnableBranch 类似于编程语言中的 if-elif-else 结构。它接收一组「条件-分支」对,按顺序检查每个条件,执行第一个条件为 True 的分支。如果所有条件都不满足,执行默认分支。
from langchain_core.runnables import RunnableBranch, RunnableLambda
def check_chinese(text: str) -> bool:
"""检查是否包含中文"""
return any("一" <= char <= "鿿" for char in text)
def check_code(text: str) -> bool:
"""检查是否包含代码"""
return "```" in text or "def " in text or "function " in text
def handle_chinese(text: str) -> str:
return f"[中文内容处理] {text}"
def handle_code(text: str) -> str:
return f"[代码内容处理] {text}"
def handle_default(text: str) -> str:
return f"[默认内容处理] {text}"
branch = RunnableBranch(
(check_chinese, RunnableLambda(handle_chinese)),
(check_code, RunnableLambda(handle_code)),
RunnableLambda(handle_default), # 默认分支(没有条件)
)
print(branch.invoke("你好世界")) # [中文内容处理] 你好世界
print(branch.invoke("def hello():")) # [代码内容处理] def hello():
print(branch.invoke("Hello World")) # [默认内容处理] Hello World
实际应用:多语言翻译链
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
def is_chinese(text: str) -> bool:
return any("一" <= char <= "鿿" for char in text)
def is_english(text: str) -> bool:
return all(char.isascii() for char in text.strip())
# 中文 -> 英文
zh_to_en_prompt = ChatPromptTemplate.from_template(
"将以下中文翻译为英文,只输出翻译结果:{text}"
)
zh_to_en = zh_to_en_prompt | llm | parser
# 英文 -> 中文
en_to_zh_prompt = ChatPromptTemplate.from_template(
"将以下英文翻译为中文,只输出翻译结果:{text}"
)
en_to_zh = en_to_zh_prompt | llm | parser
# 其他语言 -> 中文
other_to_zh_prompt = ChatPromptTemplate.from_template(
"将以下文本翻译为中文,只输出翻译结果:{text}"
)
other_to_zh = other_to_zh_prompt | llm | parser
# 组装条件分支
translate_chain = RunnableBranch(
(is_chinese, zh_to_en),
(is_english, en_to_zh),
other_to_zh, # 默认:翻译为中文
)
# 使用
print(translate_chain.invoke("人工智能正在改变世界"))
print(translate_chain.invoke("The quick brown fox jumps over the lazy dog"))
print(translate_chain.invoke("Bonjour le monde"))
[!warning] RunnableBranch 的条件函数签名
条件函数接收的参数类型与RunnableBranch整体接收的输入类型一致。在上面的例子中,invoke传入的是字符串,所以is_chinese和is_english函数的参数也是字符串。如果RunnableBranch放在链中间,输入可能是字典,条件函数就需要处理字典类型。
链的组合与嵌套
将子链作为组件
LCEL 的强大之处在于,任何链本身就是一个 Runnable,可以作为更复杂链的组成部分。这意味着你可以像搭积木一样组合链:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# 子链1:摘要生成
summary_chain = (
ChatPromptTemplate.from_template("用一句话总结:{text}")
| llm | parser
)
# 子链2:关键词提取
keywords_chain = (
ChatPromptTemplate.from_template("提取3个关键词,逗号分隔:{text}")
| llm | parser
)
# 子链3:标题生成
title_chain = (
ChatPromptTemplate.from_template("为以下内容生成一个简短标题(10字以内):{text}")
| llm | parser
)
# 组合:并行执行三个子链,再用结果生成报告
report_chain = (
RunnableParallel(
title=title_chain,
summary=summary_chain,
keywords=keywords_chain,
)
| ChatPromptTemplate.from_template(
"请根据以下信息生成一份分析报告:\n"
"标题:{title}\n"
"摘要:{summary}\n"
"关键词:{keywords}\n\n"
"报告格式要求:\n"
"1. 标题\n2. 内容摘要(100字)\n3. 关键词列表\n4. 总结评价"
)
| llm
| parser
)
result = report_chain.invoke({
"text": "LangChain 是一个用于构建大语言模型应用的开源框架。"
"它提供了模块化的组件和链式调用机制,支持与多种模型和数据源集成。"
"最新版本引入了 LCEL 语法,让链的组合更加简洁优雅。"
})
print(result)
嵌套链示意图
链的复用与参数化
通过参数化子链,可以构建高度可复用的组件:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
def create_qa_chain(system_prompt: str, model: str = "gpt-4o-mini"):
"""创建可复用的问答链"""
llm = ChatOpenAI(model=model, temperature=0.3)
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{question}"),
])
return prompt | llm | StrOutputParser()
# 复用链:不同角色
code_reviewer = create_qa_chain(
"你是一个严格的代码审查专家。审查代码时关注:安全性、性能、可读性。"
)
tech_writer = create_qa_chain(
"你是一个技术文档工程师。擅长将复杂概念用简洁的语言表达。"
)
print(code_reviewer.invoke({"question": "审查这段代码:x = eval(input())"}))
print(tech_writer.invoke({"question": "解释什么是 Docker 容器"}))
链的配置与运行时参数
with_config 传递运行时配置
在实际应用中,我们经常需要在运行时动态修改链的行为,比如设置不同的温度、切换模型、添加标签等。LCEL 提供了 with_config 方法来传递运行时配置:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import ConfigurableField
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
prompt = ChatPromptTemplate.from_template("解释以下概念:{concept}")
# 使用 with_config 传递运行时配置
chain = prompt | llm | parser
# 传入配置(如 tags, metadata 等)
result = chain.invoke(
{"concept": "微服务架构"},
config={"tags": ["education"], "metadata": {"user": "student_001"}},
)
print(result)
configurable_fields 动态切换参数
configurable_fields 允许在调用时动态修改组件参数,最常见的用途是运行时切换模型:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import ConfigurableField
# 创建可配置的模型
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3).configurable_fields(
model=ConfigurableField(
id="model_name",
name="Model",
description="选择使用的模型",
),
temperature=ConfigurableField(
id="temperature",
name="Temperature",
description="控制输出随机性",
),
)
prompt = ChatPromptTemplate.from_template("解释:{concept}")
chain = prompt | llm | StrOutputParser()
# 默认配置
result1 = chain.invoke({"concept": "量子纠缠"})
print("默认:", result1[:50])
# 切换到其他模型
result2 = chain.invoke(
{"concept": "量子纠缠"},
config={"configurable": {"model_name": "gpt-4o"}},
)
print("GPT-4o:", result2[:50])
# 调整温度
result3 = chain.invoke(
{"concept": "量子纠缠"},
config={"configurable": {"temperature": 0.9}},
)
print("高温度:", result3[:50])
configurable_alternatives 动态切换组件
configurable_alternatives 可以在运行时切换整个组件(比如切换不同的模型提供商):
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import ConfigurableField
# 主模型
openai_llm = ChatOpenAI(model="gpt-4o-mini")
# 配置可替代的模型(示例中使用不同的 OpenAI 模型作为替代)
configurable_llm = openai_llm.configurable_alternatives(
ConfigurableField(id="llm_provider"),
default_key="openai",
# 可以添加其他提供商的模型
# anthropic=ChatAnthropic(model="claude-3-5-sonnet"),
# deepseek=ChatOpenAI(
# model="deepseek-chat",
# base_url="https://api.deepseek.com/v1",
# api_key="your-key",
# ),
high_quality=ChatOpenAI(model="gpt-4o"),
fast=ChatOpenAI(model="gpt-4o-mini", temperature=0.0),
)
prompt = ChatPromptTemplate.from_template("回答以下问题:{question}")
chain = prompt | configurable_llm | StrOutputParser()
# 使用默认模型(openai)
result1 = chain.invoke({"question": "什么是深度学习?"})
# 切换到高质量模型
result2 = chain.invoke(
{"question": "什么是深度学习?"},
config={"configurable": {"llm_provider": "high_quality"}},
)
# 切换到快速模型
result3 = chain.invoke(
{"question": "什么是深度学习?"},
config={"configurable": {"llm_provider": "fast"}},
)
[!tip] configurable_alternatives 的应用场景
这个功能非常适合需要支持多模型提供商的场景。比如一个 SaaS 产品需要根据用户的订阅级别提供不同质量的模型:免费用户使用轻量模型,付费用户使用高端模型。通过configurable_alternatives,无需修改链的代码,只需在调用时传入配置即可。
with_fallbacks 容错机制
当主模型出现故障时,自动切换到备用模型:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
primary_llm = ChatOpenAI(model="gpt-4o-mini")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("回答:{question}")
parser = StrOutputParser()
# 主模型失败时自动切换到备用模型
chain = (
prompt
| primary_llm.with_fallbacks([fallback_llm])
| parser
)
result = chain.invoke({"question": "Python 是什么?"})
print(result)
链的序列化与部署
保存和加载链
LangChain 支持将链序列化为 JSON 格式,方便存储和部署:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 创建链
prompt = ChatPromptTemplate.from_template("用{style}风格解释:{concept}")
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
chain = prompt | llm | parser
# 保存链的配置
chain_config = {
"prompt_template": prompt.template if hasattr(prompt, "template") else str(prompt),
"model": "gpt-4o-mini",
"parser": "StrOutputParser",
}
import json
with open("chain_config.json", "w", encoding="utf-8") as f:
json.dump(chain_config, f, ensure_ascii=False, indent=2)
# 加载并重建链
with open("chain_config.json", "r", encoding="utf-8") as f:
loaded_config = json.load(f)
rebuilt_chain = (
ChatPromptTemplate.from_template(loaded_config["prompt_template"])
| ChatOpenAI(model=loaded_config["model"])
| StrOutputParser()
)
使用 LangServe 部署链
LangServe 是 LangChain 官方的部署工具,可以将链快速暴露为 REST API:
# server.py — LangServe 部署示例
from fastapi import FastAPI
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langserve import add_routes
app = FastAPI(title="LangChain API Server")
# 创建链
prompt = ChatPromptTemplate.from_template("回答以下问题:{question}")
llm = ChatOpenAI(model="gpt-4o-mini")
chain = prompt | llm | StrOutputParser()
# 注册路由
add_routes(app, chain, path="/qa")
# 运行:uvicorn server:app --host 0.0.0.0 --port 8000
# client.py — 客户端调用
from langserve import RemoteRunnable
# 连接远程链
remote_chain = RemoteRunnable("http://localhost:8000/qa/")
# 调用
result = remote_chain.invoke({"question": "什么是微服务?"})
print(result)
# 也支持流式调用
for chunk in remote_chain.stream({"question": "什么是微服务?"}):
print(chunk, end="", flush=True)
[!warning] 序列化的限制
包含RunnableLambda(自定义函数)的链无法直接序列化,因为 Python 函数不能被 JSON 化。在实际部署中,建议将链的代码作为独立模块管理,序列化只用于存储模板和配置参数。
实战:构建多功能问答链
下面是一个综合实战案例,将本章所学的所有知识点整合到一个完整的应用中。
需求描述
构建一个智能问答系统,具备以下功能:
- 根据问题语言自动选择翻译方向
- 并行生成摘要和关键词
- 支持多种输出格式
- 带容错机制
完整代码实现
"""
多功能智能问答链 — 综合实战
整合:RunnableParallel、RunnablePassthrough、RunnableLambda、
RunnableBranch、configurable_fields、with_fallbacks
"""
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import (
RunnableParallel,
RunnablePassthrough,
RunnableLambda,
RunnableBranch,
ConfigurableField,
)
from operator import itemgetter
# ============================================================
# 1. 基础组件
# ============================================================
# 主模型 + 备用模型
primary_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3)
# 带容错的模型
llm = primary_llm.with_fallbacks([fallback_llm])
parser = StrOutputParser()
# ============================================================
# 2. 自定义工具函数(RunnableLambda)
# ============================================================
def detect_language(text: str) -> str:
"""检测文本语言"""
if any("一" <= c <= "鿿" for c in text):
return "chinese"
return "other"
def is_chinese_question(input_dict: dict) -> bool:
"""判断问题是否为中文"""
return detect_language(input_dict.get("question", "")) == "chinese"
def format_final_output(result: dict) -> str:
"""格式化最终输出"""
output = f"""
{'='*40}
分析报告
{'='*40}
问题:{result['question']}
语言:{result['language']}
{'─'*40}
回答:
{result['answer']}
{'─'*40}
摘要:{result['summary']}
关键词:{result['keywords']}
{'='*40}
""".strip()
return output
# ============================================================
# 3. 子链定义
# ============================================================
# 回答链
answer_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的技术顾问,回答要准确、有条理。"),
("human", "{question}"),
])
answer_chain = answer_prompt | llm | parser
# 摘要链
summary_prompt = ChatPromptTemplate.from_template(
"用一到两句话总结以下回答的要点:\n{answer}"
)
summary_chain = summary_prompt | llm | parser
# 关键词链
keywords_prompt = ChatPromptTemplate.from_template(
"从以下回答中提取5个最重要的技术关键词,用逗号分隔:\n{answer}"
)
keywords_chain = keywords_prompt | llm | parser
# 翻译链(条件分支)
translate_to_chinese = (
ChatPromptTemplate.from_template(
"将以下内容翻译为中文,只输出翻译结果:\n{question}"
)
| llm | parser
)
translate_to_english = (
ChatPromptTemplate.from_template(
"将以下内容翻译为英文,只输出翻译结果:\n{question}"
)
| llm | parser
)
# 条件翻译分支
translate_branch = RunnableBranch(
(is_chinese_question, translate_to_english),
translate_to_chinese,
)
# ============================================================
# 4. 组装主链
# ============================================================
# 第一步:并行检测语言 + 翻译
analysis_chain = RunnableParallel(
question=itemgetter("question"),
language=RunnableLambda(
lambda x: detect_language(x["question"])
),
translation=translate_branch,
)
# 第二步:基于翻译结果生成回答,并并行提取摘要和关键词
qa_chain = (
RunnablePassthrough.assign(
answer=itemgetter("translation")
| ChatPromptTemplate.from_template("基于以下内容回答原始问题:{translation}")
| llm
| parser,
)
| RunnablePassthrough.assign(
summary=itemgetter("answer") | summary_chain,
keywords=itemgetter("answer") | keywords_chain,
)
)
# 第三步:格式化输出
main_chain = analysis_chain | qa_chain | RunnableLambda(format_final_output)
# ============================================================
# 5. 运行
# ============================================================
if __name__ == "__main__":
# 测试1:中文问题
print(main_chain.invoke({"question": "什么是向量数据库?它有哪些应用场景?"}))
print("\n\n")
# 测试2:英文问题
print(main_chain.invoke({"question": "What is RAG and how does it work?"}))
# 测试3:流式输出
print("\n\n=== 流式输出测试 ===")
for chunk in main_chain.stream({"question": "解释 Transformer 架构"}):
print(chunk, end="", flush=True)
架构总览
关键知识点回顾
上述实战案例涉及的核心组件:
| 组件 | 用途 | 出现场景 |
|---|---|---|
RunnableParallel | 并行执行多个分支 | 语言检测 + 翻译同时进行 |
RunnablePassthrough.assign | 保留原始数据并添加新字段 | 逐步丰富输出字典 |
RunnableLambda | 自定义 Python 函数 | 语言检测、格式化输出 |
RunnableBranch | 条件分支 | 根据语言选择翻译方向 |
itemgetter | 提取字典字段 | 从中间结果中提取 answer |
with_fallbacks | 容错机制 | 主模型失败时切换备用模型 |
本章小结
核心概念一览
| 概念 | 一句话说明 | 核心方法 |
|---|---|---|
| LCEL | 声明式链组合语法 | | 管道操作符 |
| Runnable | 统一组件接口 | invoke / batch / stream |
| RunnableParallel | 并行执行多个分支 | 字典语法或类构造 |
| RunnablePassthrough | 透传输入或添加字段 | .assign() |
| RunnableLambda | 自定义函数包装 | @chain 装饰器 |
| RunnableBranch | 条件分支选择 | 条件-分支对 |
| configurable_fields | 运行时动态参数 | ConfigurableField |
| configurable_alternatives | 运行时切换组件 | ConfigurableField + alternatives |
| with_fallbacks | 自动容错 | 传入备用模型列表 |
LCEL 设计原则
学习建议
- 从基础链开始:先掌握
prompt | llm | parser这个最基本的组合,理解数据在管道中的流转方式 - 逐步增加复杂度:在基础链的基础上,依次学习
RunnableParallel、RunnablePassthrough、RunnableLambda等组件 - 多画流程图:在构建复杂链之前,先用流程图理清数据流经的路径和每一步的输入输出类型
- 善用调试:使用
.invoke()逐步调试每个子链的输出,确保数据类型在组件之间正确传递 - 关注类型:LCEL 链中最常见的错误就是类型不匹配。记住每个组件的输出类型,是避免 bug 的关键