LangChain开发(三)工作流编排(LCEL)
LCEL(Langchain Expression Language)是一种强大的工作流编排工具,可以从基本组件构建复杂任务链条(chain),并支持诸如流式处理、并行处理和日志记录等开箱即用的功能。LCEL从第一天起就被设计为支持将原型投入生产,无需更改代码,从最简单的“提示 + LLM” 链到最复杂的链(有人成功地在生产中运行了包含数百步的LCEL链)。以下是使用LCEL几个亮点。一流的流式支
文章目录
LCEL介绍
LCEL(Langchain Expression Language)是一种强大的工作流编排工具,可以从基本组件构建复杂任务链条(chain),并支持诸如流式处理、并行处理和日志记录等开箱即用的功能。
LCEL从第一天起就被设计为支持将原型投入生产,无需更改代码,从最简单的“提示 + LLM” 链到最复杂的链(有人成功地在生产中运行了包含数百步的LCEL链)。以下是使用LCEL几个亮点。
一流的流式支持 当使用 LCEL 构建链时,您将获得可能的最佳时间到第一个标记(直到输出的第一块内容出现所经过的时间)。对于某些链,这意味着我们直接从 LLM 流式传输标记到流式输出解析器,您将以与 LLM 提供程序输出原始标记的速率相同的速度获得解析的增量输出块。
异步支持 使用 LCEL 构建的任何链都可以通过同步 API(例如,在您使用 Jupyter 笔记本进行原型设计时)以及异步 API(例如,在 LangServe 服务器中)调用。这使得您可以使用相同的代码进行原型设计和生产部署,同时获得出色的性能,并能够在同一服务器上处理大量并发请求。
优化的并行执行 只要您的 LCEL 链中的步骤可以并行执行(例如,如果您从多个检索器中获取文档),我们都会自动进行并行处理,无论是在同步接口还是异步接口中,都会尽可能减少延迟。
重试和回退 为 LCEL 链的任何部分配置重试和回退。这是使您的链在规模上更可靠的好方法。我们目前正在努力为重试/回退添加流式支持,以便您在不增加延迟的情况下获得更高的可靠性。
访问中间结果 对于更复杂的链,通常非常有用的是能够在最终输出产生之前访问中间步骤的结果。这可以用来让最终用户知道正在发生某些事情,或者仅用于调试您的链。您可以流式传输中间结果,这在每个 LangServe 服务器上都是可用的。
输入和输出模式 输入和输出模式为每个LCEL链提供了从链的结构推断出 Pydantic 和 JSONSchema 模式。这可能用于验证输入和输出,并且是LangServe的一个组成部分。
原文地址:LangChain官网 LCEL文档
Runable interface
为了尽可能轻松地创建自定义链,我们实现了一个“Runnable”协议。许多 LangChain 组件都实现了 Runnable 协议,包括聊天模型、LLMs、输出解析器、检索器、提示模板等。此外,还有一些有用的基本组件可用于处理可运行对象。
这是一个标准接口,便于以标准化的方式定义和调用自定义链。标准接口包括:
stream:以流的形式返回响应块。invoke:对输入调用链。batch:对输入列表调用链。
这些方法还对应有异步方法,应该与asyncio一起使用await语法以实现并发:
astream:异步以流的形式返回响应块。ainvoke:异步对输入调用链。abatch:异步对输入列表调用链。astream_log:在执行过程中实时流式传输中间步骤以及最终响应。astream_events:beta 在链执行过程中实时流式传输事件(在langchain-core0.1.14 中引入)。
不同组件的输入类型和输出类型各不相同:
| 组件 | 输入类型 | 输出类型 |
|---|---|---|
| 提示模板 | 字典 | PromptValue |
| 聊天模型 | 单个字符串、聊天消息列表或 PromptValue | 聊天消息 |
| LLM | 单个字符串、聊天消息列表或 PromptValue | 字符串 |
| 输出解析器 | LLM 或聊天模型的输出 | 取决于解析器 |
| 检索器 | 单个字符串 | 文档列表 |
| 工具 | 单个字符串或字典,取决于工具 | 取决于工具 |
所有可运行对象都暴露了输入和输出模式,用于检查输入和输出:
input_schema:根据可运行对象的结构自动生成的输入 Pydantic 模型。output_schema:根据可运行对象的结构自动生成的输出 Pydantic 模型。
流式运行对于使基于LLM的应用程序对最终用户具有响应性至关重要。重要的LangChain组件,如聊天模型、输出解析器、提示模板、检索器和代理都实现了LangChain Runnable接口。该接口提供了两种通用的流式内容方法:
- 同步
stream和异步astream:流式传输链中的最终输出和默认实现 - 异步
astream_events和异步astream_log:这些方法提供了一种从链中流式传输中间步骤和最终输出的的方式。让我们看看这两种方法,并尝试理解如何使用它们。
Stream(流)
所有Runnable对象都实现了一个名为stream的同步方法和一个名为astream的异步变体。这些方法旨在以块的形式流式传输最终输出,尽快返回每个块。只有在程序中的所有步骤都知道如何处理输入流时,才能进行流式传输;即,逐个处理输入块,并产生响应的输出快。这种处理的复杂性可以有所不同,从简单的任务,如发出LLM生成的令牌,到更具挑战性的任务,如在整个JSON完成之前流式传输JSON结果的部分。开始探索流式传输的最佳方法是从LLM应用程序中最重要的组件开始——LLM本身!
LLM和聊天模型
大型语言模型及其聊天变体是基于LLM的应用程序的主要瓶颈。大型语言模型可能需要几秒钟才能对查询生成完整的响应。这比应用程序对最红用户具有响应性的约200-300毫秒的阈值要慢的多。使应用程序具有更高的响应行的关键策略是显示中间进度;即逐个令牌流式输出传输模型的输出。我们将展示使用聊天模型进行流式传输的示例。从以下选项中选择一个:
让我们从同步StreamAPI开始:
# 流式输出
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
chunks = []
for chunk in llm.stream("天空为什么是蓝色的?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
输出示例:
|天空|之|所以|是|蓝|色|的|,|主要|是|由于|**|瑞|利|散|射|**|的|现|象|。当|太阳|光|进入|地|球|大|气|层|时|,它|会|与|空气|中的|分|子|和|微|小|颗|粒|发生|相|互|作用|。|太阳|光|其实|是|由|各种|颜色|的|光|组成|的|,这|些|颜色|的|光|波|长|不同|。|蓝|光|的|波|长|较|短|,比|红|光|和|其他|颜色|的|光|更|容易|发生|散|射|。
|当|太阳|光|通过|大|气|时|,|蓝|色|的|光|波|(|波|长|较|短|)|比|红|色|的|光|波|(|波|长|较|长|)|更|频|繁|地|被|空气|分|子|散|射|。所以|,|散|射|后的|蓝|光|在|天空|中|占|据|主|导|地|位|,使|得|我们|从|地|面|看|向|天空|时|,|看到|的|主要|是|蓝|色|。
|简单|来说|,|天空|之|所以|看|起来|是|蓝|色|的|,|跟|蓝|色|光|被|散|射|得|更多|有|很|大|关系|。||
这里之所以中间带"|"是因为
end="|",参数,将其改为end="",展示出来的就没有“|”了。这里拿竖线分割后,也可以很直观的看到一个token不一样只有一个字。
Chain(链)
几乎所有的LLM应用程序都涉及不止一步的操作,而不仅仅是调用语言模型。让我们使用LangChain表达式语言 LCEL 构建一个简单的链,该链结合了一个提示、模型和解析器,并验证流式传输是否正常工作。我们将使用StrOutputParser来解析模型的输出,。这是一个简单的解析器,从AIMessageChunk中提取content字段,给出模型返回的token。
LCEL是一种声明式的方式,通过将不同的LangChain组件链接在一起指定一个“程序”。使用LCEL创建链可以自动实现Stream和astream,从而实现对最终输出的流式传输。事实上,使用LCEL创建的链实现了整个标准Runnable接口。
from langchain_openai import ChatOpenAI
# 为了支持异步调用
import asyncio
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
llm = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | llm | parser
async def async_stream():
async for chunk in chain.astream({"topic": "老虎"}):
print(chunk, end="", flush=True)
# 运行异步流处理
asyncio.run(async_stream())
请注意,即使我们在上面链条末尾使用了parse,我们仍然可以获得流式输出。parser会对每个流失块进行操作。许多LCEL基元也支持这种转换式的流式传递,这在构建应用程序时非常方便。
自定义函数可以被设计为返回生成器,这样就能够操作流。
某些可运行实体,如提示模板和聊天模型,无法处理耽搁块,而是聚合所有之前的步骤。这些可运行实体可以中断流处理。
LangChain表达语言允许将链的构建与使用模式(例如同步/异步、批处理/流式等)分开。如果这与您构建的内容无关,您也可以依赖于标准的命令式变成方法,通过在每个组件上调用invoke、batch和stream,将结果分配给变量,然后根据需要在下游使用它们。
Working with input Streams (使用输入流)
如果想要在输出生成时从中流式传输JSON,该怎么办呢?
如果依赖json.loads来解析部分JSON,那么解析将失败,因为部分JSON不会是有效的JSON.
这可能会束手无策,声称无法流式传输JSON。
事实上,有一种方法可以做到这一点——解析器需要在输入流上操作,并尝试将部分JSON"自动完成"为有效状态。
import asyncio
from langchain_core.output_parsers import JsonOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
chain = (
llm | JsonOutputParser()
) # 由于较旧版本的 Langchain 中存在的 bug,JsonOutputParser 未从某些模型中流式传输结果
async def async_stream():
async for text in chain.astream(
"以json格式输出法国、西班牙和日本的国家及其人口列表。"
'使用一个带有"countries"外部键的字典,其中包含国家列表。'
"每个国家都应该有键`name`和`population`"
):
print(text, flush=True)
# 使用 asyncio.run() 运行异步函数
asyncio.run(async_stream())
输出示例:
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 670}]}
{'countries': [{'name': 'France', 'population': 670810}]}
{'countries': [{'name': 'France', 'population': 67081000}]}
{'countries': [{'name': 'France', 'population': 67081000}, {}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 473}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 473500}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125800}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125800000}]}
Stream events(事件流)
现在我们已经了解了stream和astream的工作原理,让我们进入事件流的世界。
事件流是一个beta API。这个API可能会根据反馈略微更改。
这里演示了V2API,需要langchain-core >= 0.2。对于与旧版本LangChian兼容的V1API,请参阅这里。
查看langchain版本:
import langchain_core
print(langchain_core.__version__)
为了使astream_events API正常工作:
- 在代码中尽可能使用
async(例如,异步工具等) - 如果定义自定义函数/可运行项,请传播回调
- 在没有LCEL的情况下使用可运行项,请确保在LLMs调用
.astream()而不是.ainvoke以强制LLM流式传输令牌
事件参考
下面是一个参考表,显示各种可运行对象可能发出的一些实践。
当流式传输正确实现时,对于可运行项的输入知道输入流完成小号后才会知道。这意味着inputs通常仅包括end实践,而不是包括start事件。
| event | name | chunk | input | output |
|---|---|---|---|---|
| on_chat_model_start | [model name] | {“messages”: [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | [model name] | AIMessageChunk(content=“hello”) | ||
| on_chat_model_end | [model name] | {“messages”: [[SystemMessage, HumanMessage]]} | AIMessageChunk(content=“hello world”) | |
| on_llm_start | [model name] | {‘input’: ‘hello’} | ||
| on_llm_stream | [model name] | ‘Hello’ | ||
| on_llm_end | [model name] | ‘Hello human!’ | ||
| on_chain_start | format_docs | |||
| on_chain_stream | format_docs | “hello world!, goodbye world!” | ||
| on_chain_end | format_docs | [Document(…)] | “hello world!, goodbye world!” | |
| on_tool_start | some_tool | {“x”: 1, “y”: “2”} | ||
| on_tool_end | some_tool | {“x”: 1, “y”: “2”} | ||
| on_retriever_start | [retriever name] | {“query”: “hello”} | ||
| on_retriever_end | [retriever name] | {“query”: “hello”} | [Document(…), …] | |
| on_prompt_start | [template_name] | {“question”: “hello”} | ||
| on_prompt_end | [template_name] | {“question”: “hello”} | ChatPromptValue(messages: [SystemMessage, …]) |
获取事件信息,方便排查问题
from langchain_openai import ChatOpenAI
import asyncio
llm = ChatOpenAI()
async def async_stream():
events = []
async for event in llm.astream_events("hello", version="v2"):
events.append(event)
print(events)
asyncio.run(async_stream())
源码地址
https://github.com/lys1313013/langchain-example/tree/main/03-stream
参考资料
B站:2025吃透LangChain大模型全套教程(LLM+RAG+OpenAI+Agent)第3集
LangChain官网:Runnable interface文档
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)