LCEL介绍

LCEL(Langchain Expression Language)是一种强大的工作流编排工具,可以从基本组件构建复杂任务链条(chain),并支持诸如流式处理、并行处理和日志记录等开箱即用的功能。

LCEL从第一天起就被设计为支持将原型投入生产,无需更改代码,从最简单的“提示 + LLM” 链到最复杂的链(有人成功地在生产中运行了包含数百步的LCEL链)。以下是使用LCEL几个亮点。

一流的流式支持 当使用 LCEL 构建链时,您将获得可能的最佳时间到第一个标记(直到输出的第一块内容出现所经过的时间)。对于某些链,这意味着我们直接从 LLM 流式传输标记到流式输出解析器,您将以与 LLM 提供程序输出原始标记的速率相同的速度获得解析的增量输出块。

异步支持 使用 LCEL 构建的任何链都可以通过同步 API(例如,在您使用 Jupyter 笔记本进行原型设计时)以及异步 API(例如,在 LangServe 服务器中)调用。这使得您可以使用相同的代码进行原型设计和生产部署,同时获得出色的性能,并能够在同一服务器上处理大量并发请求。

优化的并行执行 只要您的 LCEL 链中的步骤可以并行执行(例如,如果您从多个检索器中获取文档),我们都会自动进行并行处理,无论是在同步接口还是异步接口中,都会尽可能减少延迟。

重试和回退 为 LCEL 链的任何部分配置重试和回退。这是使您的链在规模上更可靠的好方法。我们目前正在努力为重试/回退添加流式支持,以便您在不增加延迟的情况下获得更高的可靠性。

访问中间结果 对于更复杂的链,通常非常有用的是能够在最终输出产生之前访问中间步骤的结果。这可以用来让最终用户知道正在发生某些事情,或者仅用于调试您的链。您可以流式传输中间结果,这在每个 LangServe 服务器上都是可用的。

输入和输出模式 输入和输出模式为每个LCEL链提供了从链的结构推断出 Pydantic 和 JSONSchema 模式。这可能用于验证输入和输出,并且是LangServe的一个组成部分。

原文地址:LangChain官网 LCEL文档

Runable interface

为了尽可能轻松地创建自定义链,我们实现了一个“Runnable”协议。许多 LangChain 组件都实现了 Runnable 协议,包括聊天模型、LLMs、输出解析器、检索器、提示模板等。此外,还有一些有用的基本组件可用于处理可运行对象。

这是一个标准接口,便于以标准化的方式定义和调用自定义链。标准接口包括:

  • stream:以流的形式返回响应块。
  • invoke:对输入调用链。
  • batch:对输入列表调用链。

这些方法还对应有异步方法,应该与asyncio一起使用await语法以实现并发:

  • astream:异步以流的形式返回响应块。
  • ainvoke:异步对输入调用链。
  • abatch:异步对输入列表调用链。
  • astream_log:在执行过程中实时流式传输中间步骤以及最终响应。
  • astream_eventsbeta 在链执行过程中实时流式传输事件(在 langchain-core 0.1.14 中引入)。

不同组件的输入类型输出类型各不相同:

组件 输入类型 输出类型
提示模板 字典 PromptValue
聊天模型 单个字符串、聊天消息列表或 PromptValue 聊天消息
LLM 单个字符串、聊天消息列表或 PromptValue 字符串
输出解析器 LLM 或聊天模型的输出 取决于解析器
检索器 单个字符串 文档列表
工具 单个字符串或字典,取决于工具 取决于工具

所有可运行对象都暴露了输入和输出模式,用于检查输入和输出:

  • input_schema:根据可运行对象的结构自动生成的输入 Pydantic 模型。
  • output_schema:根据可运行对象的结构自动生成的输出 Pydantic 模型。

原文地址:LangChain官网 Runnable interface

流式运行对于使基于LLM的应用程序对最终用户具有响应性至关重要。重要的LangChain组件,如聊天模型、输出解析器、提示模板、检索器和代理都实现了LangChain Runnable接口。该接口提供了两种通用的流式内容方法:

  1. 同步stream和异步astream:流式传输链中的最终输出默认实现
  2. 异步astream_events和异步astream_log:这些方法提供了一种从链中流式传输中间步骤最终输出的的方式。让我们看看这两种方法,并尝试理解如何使用它们。

Stream(流)

所有Runnable对象都实现了一个名为stream的同步方法和一个名为astream的异步变体。这些方法旨在以块的形式流式传输最终输出,尽快返回每个块。只有在程序中的所有步骤都知道如何处理输入流时,才能进行流式传输;即,逐个处理输入块,并产生响应的输出快。这种处理的复杂性可以有所不同,从简单的任务,如发出LLM生成的令牌,到更具挑战性的任务,如在整个JSON完成之前流式传输JSON结果的部分。开始探索流式传输的最佳方法是从LLM应用程序中最重要的组件开始——LLM本身!

LLM和聊天模型

大型语言模型及其聊天变体是基于LLM的应用程序的主要瓶颈。大型语言模型可能需要几秒钟才能对查询生成完整的响应。这比应用程序对最红用户具有响应性的约200-300毫秒的阈值要慢的多。使应用程序具有更高的响应行的关键策略是显示中间进度;即逐个令牌流式输出传输模型的输出。我们将展示使用聊天模型进行流式传输的示例。从以下选项中选择一个:

让我们从同步StreamAPI开始:

# 流式输出
from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

chunks = []
for chunk in llm.stream("天空为什么是蓝色的?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

输出示例:

|天空|之|所以|是|蓝|色|的|,|主要|是|由于|**|瑞|利|散|射|**|的|现|象|。当|太阳|光|进入|地|球|大|气|层|时|,它|会|与|空气|中的|分|子|和|微|小|颗|粒|发生|相|互|作用|。|太阳|光|其实|是|由|各种|颜色|的|光|组成|的|,这|些|颜色|的|光|波|长|不同|。|蓝|光|的|波|长|较|短|,比|红|光|和|其他|颜色|的|光|更|容易|发生|散|射|。

|当|太阳|光|通过|大|气|时|,|蓝|色|的|光|波|(|波|长|较|短|)|比|红|色|的|光|波|(|波|长|较|长|)|更|频|繁|地|被|空气|分|子|散|射|。所以|,|散|射|后的|蓝|光|在|天空|中|占|据|主|导|地|位|,使|得|我们|从|地|面|看|向|天空|时|,|看到|的|主要|是|蓝|色|。

|简单|来说|,|天空|之|所以|看|起来|是|蓝|色|的|,|跟|蓝|色|光|被|散|射|得|更多|有|很|大|关系|。||

这里之所以中间带"|"是因为end="|",参数,将其改为end="",展示出来的就没有“|”了。

这里拿竖线分割后,也可以很直观的看到一个token不一样只有一个字。

Chain(链)

几乎所有的LLM应用程序都涉及不止一步的操作,而不仅仅是调用语言模型。让我们使用LangChain表达式语言 LCEL 构建一个简单的链,该链结合了一个提示、模型和解析器,并验证流式传输是否正常工作。我们将使用StrOutputParser来解析模型的输出,。这是一个简单的解析器,从AIMessageChunk中提取content字段,给出模型返回的token

LCEL是一种声明式的方式,通过将不同的LangChain组件链接在一起指定一个“程序”。使用LCEL创建链可以自动实现Streamastream,从而实现对最终输出的流式传输。事实上,使用LCEL创建的链实现了整个标准Runnable接口。

from langchain_openai import ChatOpenAI
# 为了支持异步调用
import asyncio
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
llm = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | llm | parser


async def async_stream():
    async for chunk in chain.astream({"topic": "老虎"}):
        print(chunk, end="", flush=True)


# 运行异步流处理
asyncio.run(async_stream())

请注意,即使我们在上面链条末尾使用了parse,我们仍然可以获得流式输出。parser会对每个流失块进行操作。许多LCEL基元也支持这种转换式的流式传递,这在构建应用程序时非常方便。

自定义函数可以被设计为返回生成器,这样就能够操作流。

某些可运行实体,如提示模板和聊天模型,无法处理耽搁块,而是聚合所有之前的步骤。这些可运行实体可以中断流处理。

LangChain表达语言允许将链的构建与使用模式(例如同步/异步、批处理/流式等)分开。如果这与您构建的内容无关,您也可以依赖于标准的命令式变成方法,通过在每个组件上调用invoke、batch和stream,将结果分配给变量,然后根据需要在下游使用它们。

Working with input Streams (使用输入流)

如果想要在输出生成时从中流式传输JSON,该怎么办呢?

如果依赖json.loads来解析部分JSON,那么解析将失败,因为部分JSON不会是有效的JSON.

这可能会束手无策,声称无法流式传输JSON。

事实上,有一种方法可以做到这一点——解析器需要在输入流上操作,并尝试将部分JSON"自动完成"为有效状态。

import asyncio

from langchain_core.output_parsers import JsonOutputParser
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
chain = (
        llm | JsonOutputParser()
) # 由于较旧版本的 Langchain 中存在的 bug,JsonOutputParser 未从某些模型中流式传输结果


async def async_stream():
    async for text in chain.astream(
            "以json格式输出法国、西班牙和日本的国家及其人口列表。"
            '使用一个带有"countries"外部键的字典,其中包含国家列表。'
            "每个国家都应该有键`name`和`population`"
    ):
        print(text, flush=True)

# 使用 asyncio.run() 运行异步函数
asyncio.run(async_stream())

输出示例:

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 670}]}
{'countries': [{'name': 'France', 'population': 670810}]}
{'countries': [{'name': 'France', 'population': 67081000}]}
{'countries': [{'name': 'France', 'population': 67081000}, {}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 473}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 473500}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125800}]}
{'countries': [{'name': 'France', 'population': 67081000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125800000}]}

代码来自于LangChain中文网:使用 LangChain 进行流式处理

Stream events(事件流)

现在我们已经了解了streamastream的工作原理,让我们进入事件流的世界。

事件流是一个beta API。这个API可能会根据反馈略微更改。

这里演示了V2API,需要langchain-core >= 0.2。对于与旧版本LangChian兼容的V1API,请参阅这里

查看langchain版本:

import langchain_core
print(langchain_core.__version__)

为了使astream_events API正常工作:

  • 在代码中尽可能使用async(例如,异步工具等)
  • 如果定义自定义函数/可运行项,请传播回调
  • 在没有LCEL的情况下使用可运行项,请确保在LLMs调用.astream()而不是.ainvoke以强制LLM流式传输令牌

事件参考

下面是一个参考表,显示各种可运行对象可能发出的一些实践。

当流式传输正确实现时,对于可运行项的输入知道输入流完成小号后才会知道。这意味着inputs通常仅包括end实践,而不是包括start事件。

event name chunk input output
on_chat_model_start [model name] {“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream [model name] AIMessageChunk(content=“hello”)
on_chat_model_end [model name] {“messages”: [[SystemMessage, HumanMessage]]} AIMessageChunk(content=“hello world”)
on_llm_start [model name] {‘input’: ‘hello’}
on_llm_stream [model name] ‘Hello’
on_llm_end [model name] ‘Hello human!’
on_chain_start format_docs
on_chain_stream format_docs “hello world!, goodbye world!”
on_chain_end format_docs [Document(…)] “hello world!, goodbye world!”
on_tool_start some_tool {“x”: 1, “y”: “2”}
on_tool_end some_tool {“x”: 1, “y”: “2”}
on_retriever_start [retriever name] {“query”: “hello”}
on_retriever_end [retriever name] {“query”: “hello”} [Document(…), …]
on_prompt_start [template_name] {“question”: “hello”}
on_prompt_end [template_name] {“question”: “hello”} ChatPromptValue(messages: [SystemMessage, …])

获取事件信息,方便排查问题

from langchain_openai import ChatOpenAI
import asyncio

llm = ChatOpenAI()


async def async_stream():
    events = []
    async for event in llm.astream_events("hello", version="v2"):
        events.append(event)
    print(events)


asyncio.run(async_stream())

源码地址

https://github.com/lys1313013/langchain-example/tree/main/03-stream

参考资料

B站:2025吃透LangChain大模型全套教程(LLM+RAG+OpenAI+Agent)第3集

LangChain官网:LCEL文档

LangChain官网:Runnable interface文档

LangChain官网:Using Stream Events 文档

LangChain中文网:使用 LangChain 进行流式处理

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐