01. 多组件 invoke 嵌套的缺点

在前面的课时中,我们使用多个组件的 invoke 进行嵌套来创建 LLM 应用,示例代码如下

prompt = ChatPromptTemplate.from_template("{query}")

llm = ChatOpenAI(model="gpt-3.5-turbo-16k")

parser = StrOutputParser()

# 获取输出内容

content = parser.invoke(

    llm.invoke(

        prompt.invoke(

            {"query": req.query.data}

        )

    )

)

这种写法虽然能实现对应的功能,但是存在很多缺陷:

  1. 嵌套式写法让程序的维护性与可阅读性大大降低,当需要修改某个组件时,变得异常困难。
  2. 没法得知每一步的具体结果与执行进度,出错时难以排查。
  3. 嵌套式写法没法集成大量的组件,组件越来越多时,代码会变成“一次性”代码。

前端代码中的嵌套/回调地狱

思考:能否将嵌套的写法改成平级的调用,这样就可以屏蔽嵌套带来的大量缺陷。

02. 手写一个"Chain"优化代码

观察发现,虽然 Prompt、Model、OutputParser 分别有自己独立的调用方式,例如:

  • Prompt 组件:format、invoke、to_string、to_messages。
  • Model 组件:generate、invoke、batch。
  • OutputParser 组件:parse、invoke。

但是有一个共同的调用方法:invoke,并且每一个组件的输出都是下一个组件的输入,是否可以将所有组件组装得到一个列表,然后循环依次调用 invoke 执行每一个组件,然后将当前组件的输出作为下一个组件的输入。

源码实现

from typing import Any

import dotenv

from langchain_core.output_parsers import StrOutputParser

from langchain_core.prompts import ChatPromptTemplate

from langchain_openai import ChatOpenAI

dotenv.load_dotenv()

prompt = ChatPromptTemplate.from_template("{query}")

llm = ChatOpenAI(model="gpt-3.5-turbo-16k")

parser = StrOutputParser()

class Chain:

    steps: list = []

    def __init__(self, steps: list):

        self.steps = steps

    def invoke(self, input: Any) -> Any:

        output: Any = input

        for step in self.steps:

            output = step.invoke(output)

            print(step)

            print("执行结果:", output)

            print("===============")

        return output

chain = Chain([prompt, llm, parser])

print(chain.invoke({"query": "你好,你是?"}))

输出内容:

input_variables=['query'] messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['query'], template='{query}'))]

执行结果: messages=[HumanMessage(content='你好,你是?')]

===============

client=<openai.resources.chat.completions.Completions object at 0x000001C6BF694310> async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x000001C6BF695BD0> model_name='gpt-3.5-turbo-16k' openai_api_key=SecretStr('**********') openai_api_base='https://api.xty.app/v1' openai_proxy=''

执行结果: content='你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样的问题,帮助解决问题,提供信息和创意。有什么我可以帮助你的吗?' response_metadata={'token_usage': {'completion_tokens': 72, 'prompt_tokens': 13, 'total_tokens': 85}, 'model_name': 'gpt-3.5-turbo-16k', 'system_fingerprint': 'fp_b28b39ffa8', 'finish_reason': 'stop', 'logprobs': None} id='run-5bf9e183-4b28-4be9-bf65-ce0ad9590785-0'

===============

执行结果: 你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样的问题,帮助解决问题,提供信息和创意。有什么我可以帮助你的吗?

===============

你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样的问题,帮助解决问题,提供信息和创意。有什么我可以帮助你的吗?

通过自定义"Chain"的方式虽然简化了过程,也支持观察,不过功能过于简陋,在 LangChain 中既然 Prompt、Model、OutputParser 均支持 invoke 方法,底层是否统一了某种规范?并支持以更简单的方式进行调用?

03. Runnable 简介与 LCEL 表达式

为了尽可能简化创建自定义链,LangChain 官方实现了一个 Runnable 协议,这个协议适用于 LangChain 中的绝大部分组件,并实现了大量的标准接口,涵盖:

  1. stream:将组件的响应块流式返回,如果组件不支持流式则会直接输出。
  2. invoke:调用组件并得到对应的结果。
  3. batch:批量调用组件并得到对应的结果。
  4. astream:stream 的异步版本。
  5. ainvoke:invoke 的异步版本。
  6. abatch:batch 的异步版本。
  7. astream_log:除了流式返回最终响应块之外,还会流式返回中间步骤。

除此之外,在 Runnable 中还重写了 __or__ 和 __ror__ 方法,这是 Python 中 | 运算符的计算逻辑,所有的 Runnable 组件,均可以通过 | 或者 pipe() 的方式将多个组件拼接起来形成一条链

from typing import Any

import dotenv

from langchain_core.output_parsers import StrOutputParser

from langchain_core.prompts import ChatPromptTemplate

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

from langchain_openai import ChatOpenAI

dotenv.load_dotenv()

prompt = ChatPromptTemplate.from_template("{query}")

llm = ChatOpenAI(model="gpt-3.5-turbo-16k")

parser = StrOutputParser()

chain = prompt | llm | parser

# 等价于以下写法

composed_chain_with_pipe = (

RunnableParallel({"query": RunnablePassthrough()})

    .pipe(prompt)

    .pipe(llm)

    .pipe(parser)

)

print(chain.invoke({"query": "你好,你是?"}))

Runnable 底层的运行逻辑本质上也是将每一个组件添加到列表中,然后按照顺序执行并返回最终结果,核心源码

def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:

    from langchain_core.beta.runnables.context import config_with_context

    # setup callbacks and context

    config = config_with_context(ensure_config(config), self.steps)

    callback_manager = get_callback_manager_for_config(config)

    # start the root run

    run_manager = callback_manager.on_chain_start(

        dumpd(self),

        input,

        name=config.get("run_name") or self.get_name(),

        run_id=config.pop("run_id", None),

    )

    # 调用所有步骤并逐个执行得到对应的输出,然后作为下一个的输入

    try:

        for i, step in enumerate(self.steps):

            input = step.invoke(

                input,

                # mark each step as a child run

                patch_config(

                    config, callbacks=run_manager.get_child(f"seq:step:{i+1}")

                ),

            )

    # finish the root run

    except BaseException as e:

        run_manager.on_chain_error(e)

        raise

    else:

        run_manager.on_chain_end(input)

        return cast(Output, input)

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐