一、核心摘要

核心观点: LangChain 1.0 于 2025 年 10 月 23 日正式发布,标志着 AI Agent 开发进入生产就绪阶段。本系统梳理从 0.x 到 1.0 的演进脉络、核心架构理念及实战应用。

内容概括:

  • 版本演进: 从 Chain 设计转向 Agent 优先,引入中间件机制、统一抽象层
  • 架构革新: 静态/动态模型选择、工具调用优化、结构化输出策略
  • 生产级特性: 中间件钩子、状态管理、流式输出、错误处理

价值说明: 本指南适合 AI 工程师、架构师及技术决策者阅读,提供从设计到落地的完整路径。文中所有代码示例均基于 Python 3.10+ 环境。


二、LangChain 版本演进全景

2.1 版本时间线与里程碑

版本 发布时间 核心特征 定位 引用来源
0.1.x 2023年初 简单 Chain 组合 实验阶段 [7†]
0.2.x 2024年中 Agent 原型出现 试生产 [3†]
0.3.x 2025年初 多模态支持 工程化探索 [3†]
1.0.0 2025年10月23日 生产就绪架构 正式版 [1†]

版本演进逻辑: LangChain 经历了"实验验证 → 工程化探索 → 生产就绪"三个阶段。1.0 版本终结了此前架构碎片化局面,以"生产就绪"为核心定位,重新定义智能 Agent 开发范式 [1†]。

2.2 核心架构理念的变迁

0.x 时代的特征与局限

特征描述: 0.x 版本以 Chain 为核心设计理念,强调线性组合的执行流程。

主要局限:

  1. 僵化的 Chain 设计: 难以支持复杂的分支逻辑和动态决策 [6†]
  2. 缺少生产级控制: 缺乏中间件钩子,难以实现日志、监控、重试等企业级需求
  3. 模型切换重复工作: 不同模型间切换需要大量代码修改 [6†]
1.0 代的架构革新

核心变化: LangChain 1.0 彻底重构 Agent 开发范式,主要变化集中在 create_agent() 方法 [4†]:

  1. 统一 Agent 抽象: 提供标准化接口,支持多种执行模式
  2. 中间件架构: 提供 5 大钩子函数(before_model、after_model、wrap_model_call、wrap_tool_call、before_agentExecute)[2†]
  3. 静态/动态模型选择: 支持运行时根据上下文动态切换模型 [0†]
  4. 结构化输出优化: 引入 ToolStrategy 和 ProviderStrategy 策略模式 [9†]

架构对比:

维度 0.x 版本 1.0 版本
设计范式 Chain 线性组合 Agent 循环 + 中间件
模型管理 静态绑定 静态/动态选择
扩展性 有限,需修改源码 高,通过中间件钩子
生产级特性 缺乏完善的监控 内置日志、重试、限流

小结: 1.0 版本通过中间件机制和统一抽象,解决了 0.x 版本在工程化、生产可控性方面的不足,为构建企业级 Agent 系统奠定了基础。


三、LangChain 1.0 核心架构深度解析

3.1 系统架构总览

LangChain 1.0 的 Agent 系统采用分层架构设计:

┌─────────────────────────────────────────────┐
│              用户接口层(UI/API)              │
├─────────────────────────────────────────────┤
│           中间件层(Middleware)              │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐      │
│  │日志监控  │ │权限控制  │ │错误处理  │      │
│  └─────────┘ └─────────┘ └─────────┘      │
├─────────────────────────────────────────────┤
│              Agent 运行时层                   │
│  ┌──────────────────────────────────────┐   │
│  │    状态管理(State Management)       │   │
│  │    工具调度(Tool Orchestration)     │   │
│  │    模型接口(Model Interface)        │   │
│  └──────────────────────────────────────┘   │
├─────────────────────────────────────────────┤
│              工具层(Tools)                  │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐      │
│  │搜索工具  │ │数据库工具  │ │自定义工具  │      │
│  └─────────┘ └─────────┘ └─────────┘      │
└─────────────────────────────────────────────┘

架构说明:

  • 用户接口层: 提供 REST API、gRPC、Streamlit 等多种交互方式
  • 中间件层: 通过钩子函数实现横切关注点(日志、监控、权限等)
  • Agent 运行时层: 核心执行引擎,管理状态、调度工具和模型
  • 工具层: 可插拔的工具生态,支持 HTTP、数据库、第三方 API 等

3.2 核心组件详解

3.2.1 模型(Model)抽象层

LangChain 1.0 提供了统一的模型接口,支持多种模型提供商。

1. 静态模型配置

静态模型在创建 Agent 时一次性指定,执行期间保持不变:

from langchain.agents import create_agent
from langchain_openai import ChatOpenAI

# 方式1:使用模型标识符字符串
agent = create_agent(
    model="gpt-4o",  # 自动推断为 openai:gpt-4o
    tools=[search_tool, get_weather],
    system_prompt="You are a helpful assistant."
)

# 方式2:直接使用模型实例
model = ChatOpenAI(
    model="gpt-4o",
    temperature=0.1,
    max_tokens=1000,
    timeout=30
)
agent = create_agent(
    model=model,
    tools=[search_tool, get_weather]
)

2. 动态模型选择

动态模型允许在运行时根据状态和上下文自动切换:

from langchain.agents.middleware import wrap_model_call

basic_model = ChatOpenAI(model="gpt-4o-mini")
advanced_model = ChatOpenAI(model="gpt-4o")

@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
    """根据对话复杂度选择模型"""
    message_count = len(request.state["messages"])
    
    if message_count > 10:
        # 长对话使用高级模型
        model = advanced_model
    else:
        # 短对话使用基础模型
        model = basic_model
    
    return handler(request.override(model=model))

agent = create_agent(
    model=basic_model,  # 默认模型
    tools=[search_tool, get_weather],
    middleware=[dynamic_model_selection]
)

应用场景: 动态模型选择适用于成本优化(简单任务用便宜模型)、性能优化(复杂任务用强模型)、多租户场景(不同用户用不同模型)等。

3.2.2 工具(Tools)系统

LangChain 1.0 的工具系统支持多种工具调用模式,提供了强大的灵活性。

1. 工具定义与注册

from langchain.tools import tool

# 方式1:使用装饰器定义工具
@tool
def search(query: str) -> str:
    """搜索信息"""
    return f"Results for: {query}"

@tool
def get_weather(location: str) -> str:
    """获取天气信息"""
    return f"Weather in {location}: Sunny, 72°F"

# 方式2:继承 BaseTool
from langchain.tools import BaseTool

class DatabaseTool(BaseTool):
    name = "database_query"
    description = "Execute database queries"
    
    def _run(self, query: str) -> str:
        # 执行数据库查询
        return f"Result for query: {query}"
    
    async def _arun(self, query: str) -> str:
        return self._run(query)

# 注册工具到 Agent
agent = create_agent(
    model="gpt-4o",
    tools=[search, get_weather, DatabaseTool()]
)

2. 工具错误处理

from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage

@wrap_tool_call
def handle_tool_errors(request, handler):
    """处理工具执行错误"""
    try:
        return handler(request)
    except Exception as e:
        # 返回自定义错误消息给模型
        return ToolMessage(
            content=f"Tool error: Please check your input and try again. ({str(e)})",
            tool_call_id=request.tool_call["id"]
        )

agent = create_agent(
    model="gpt-4o",
    tools=[search, get_weather],
    middleware=[handle_tool_errors]
)

3. 并行与串行工具调用

LangChain 1.0 支持智能调度多个工具调用:

# 串行工具调用
agent = create_agent(
    model="gpt-4o",
    tools=[search, get_weather, analyze_data]
)

# 并行工具调用(通过中间件实现)
from langchain.agents.middleware import ParallelToolExecution

parallel_executor = ParallelToolExecution(
    tools=[search, get_weather],
    max_concurrent=3
)

agent = create_agent(
    model="gpt-4o",
    tools=[analyze_data],
    middleware=[parallel_executor]
)
3.2.3 中间件(Middleware)机制

中间件是 LangChain 1.0 的核心创新,提供了强大的扩展能力。

中间件生命周期钩子:

from langchain.agents.middleware import AgentMiddleware

class CustomMiddleware(AgentMiddleware):
    """自定义中间件"""
    
    def before_model(self, state, runtime):
        """模型调用前钩子"""
        # 可以修改 state、添加日志、权限检查等
        print(f"Before model call, messages count: {len(state['messages'])}")
        return None
    
    def after_model(self, state, runtime, response):
        """模型调用后钩子"""
        # 可以处理模型响应、添加监控等
        print(f"After model call, response: {response}")
        return None
    
    def wrap_model_call(self, request, handler):
        """包装模型调用(替代 before_model + after_model)"""
        # 自定义模型选择逻辑
        return handler(request)
    
    def wrap_tool_call(self, request, handler):
        """包装工具调用"""
        # 自定义工具执行逻辑
        return handler(request)
    
    def before_agent_execute(self, state, runtime):
        """Agent 执行前钩子"""
        return None

# 使用中间件
agent = create_agent(
    model="gpt-4o",
    tools=[search, get_weather],
    middleware=[CustomMiddleware()]
)

内置中间件示例:

from langchain.agents.middleware import SummarizationMiddleware, HumanInTheLoopMiddleware

# 1. 摘要中间件(自动总结历史消息)
summarizationMiddleware = SummarizationMiddleware(
    model="claude-sonnet-4-5-20250929",
    trigger={"tokens": 1000}  # 当消息超过1000个token时触发摘要
)

# 2. 人机循环中间件(允许人工介入审批)
humanInTheLoopMiddleware = HumanInTheLoopMiddleware(
    interruptOn={
        "sendEmail": {
            "allowedDecisions": ["approve", "edit", "reject"]
        }
    }
)

agent = create_agent(
    model="gpt-4o",
    tools=[read_email, send_email],
    middleware=[summarizationMiddleware, humanInTheLoopMiddleware]
)

3.3 状态管理(State Management)

LangChain 1.0 提供了灵活的状态管理机制,支持短期记忆和长期存储。

3.3.1 短期记忆(Short-term Memory)

Agent 自动维护对话历史作为短期记忆:

# 默认情况下,Agent 自动维护 messages 列表
result = agent.invoke({
    "messages": [
        {"role": "user", "content": "What's the weather in San Francisco?"}
    ]
})

# 查看对话历史
print(result["messages"])
3.3.2 自定义状态模式
from langchain.agents import AgentState
from typing import TypedDict

class CustomState(TypedDict):
    """自定义状态模式"""
    messages: list  # 必须包含 messages
    user_preferences: dict  # 自定义字段
    session_id: str  # 会话ID

# 通过中间件扩展状态
class StateMiddleware(AgentMiddleware):
    state_schema = CustomState
    
    def before_model(self, state: CustomState, runtime):
        # 访问自定义状态字段
        preferences = state.get("user_preferences", {})
        print(f"User preferences: {preferences}")
        return None

# 通过 state_schema 快速定义
agent = create_agent(
    model="gpt-4o",
    tools=[search, get_weather],
    state_schema=CustomState
)

# 使用自定义状态
result = agent.invoke({
    "messages": [{"role": "user", "content": "Hi"}],
    "user_preferences": {"language": "zh", "style": "formal"},
    "session_id": "session_123"
})

注意事项:

  • 自定义状态模式必须扩展 AgentState(包含 messages 字段)
  • 使用 TypedDict 定义状态类型(不再支持 Pydantic 模型和数据类)[9†]
3.3.3 长期存储(Long-term Memory)

对于需要跨会话持久化的场景,可以使用外部存储:

from langchain.agents.memory import LongTermMemory

# 初始化长期存储
memory = LongTermMemory(
    storage_type="redis",  # 或 "mongodb", "postgresql"
    connection_params={
        "host": "localhost",
        "port": 6379,
        "db": 0
    }
)

# 创建支持长期记忆的 Agent
agent = create_agent(
    model="gpt-4o",
    tools=[search, get_weather],
    memory=memory  # 传入 memory 参数
)

# 查询长期记忆
historical_context = memory.get_context(user_id="user_123")

3.4 结构化输出(Structured Output)

LangChain 1.0 提供了两种结构化输出策略,确保输出符合预期格式。

3.4.1 ToolStrategy(工具调用策略)

ToolStrategy 通过人工工具调用生成结构化输出,适用于所有支持工具调用的模型:

from pydantic import BaseModel
from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy

class ContactInfo(BaseModel):
    name: str
    email: str
    phone: str

agent = create_agent(
    model="gpt-4o-mini",
    tools=[search_tool],
    response_format=ToolStrategy(ContactInfo)  # 使用 Tool 策略
)

result = agent.invoke({
    "messages": [{
        "role": "user", 
        "content": "Extract contact info from: John Doe, john@example.com, (555) 123-4567"
    }]
})

print(result["structured_response"])
# 输出: ContactInfo(name='John Doe', email='john@example.com', phone='555-123-4567')
3.4.2 ProviderStrategy(提供商原生策略)

ProviderStrategy 使用模型原生的结构化输出生成,更可靠但仅支持支持该功能的提供商(如 OpenAI):

from langchain.agents.structured_output import ProviderStrategy

agent = create_agent(
    model="gpt-4o",  # 仅支持 ProviderStrategy 的模型
    tools=[search_tool],
    response_format=ProviderStrategy(ContactInfo)
)

result = agent.invoke({
    "messages": [{
        "role": "user", 
        "content": "Extract contact info from: John Doe, john@example.com, (555) 123-4567"
    }]
})

重要提示:

  • 在 LangChain 1.0 中,直接传递 schema(如 response_format=ContactInfo)不再支持,必须显式使用 ToolStrategy 或 ProviderStrategy [9†]
  • 预绑定工具的模型(model.bind_tools())在使用结构化输出时不支持,应传递工具列表给 create_agent() [9†]

四、生产级实战案例

4.1 案例 1:Text-to-SQL 智能数据分析系统

场景描述: 构建一个将自然语言转换为 SQL 查询的智能数据分析系统,支持 RAG(检索增强生成)和数据库查询。

4.1.1 系统架构设计
┌─────────────────────────────────────────────┐
│              Streamlit 前端界面              │
├─────────────────────────────────────────────┤
│              FastAPI 后端服务                │
├─────────────────────────────────────────────┤
│        LangChain Agent (Text-to-SQL)        │
│  ┌──────────────────────────────────────┐   │
│  │  1. 检索相关表结构(Retriever)       │   │
│  │  2. 生成 SQL(LLM + Prompt)         │   │
│  │  3. 验证 SQL(安全检查)             │   │
│  │  4. 执行查询(Database)             │   │
│  └──────────────────────────────────────┘   │
├─────────────────────────────────────────────┤
│        向量数据库(Chroma) + 嵌入模型        │
└─────────────────────────────────────────────┘
4.1.2 完整实现代码

1. 环境准备

pip install langchain faiss-cpu chromadb \
    fastapi uvicorn streamlit \
    langchain-openai langchain-community \
    sqlglot python-dotenv

2. 数据库初始化

# sample_db/create_sample_db.py
import sqlite3
import os

def create_sample_db():
    """创建示例数据库和表"""
    os.makedirs("sample_db", exist_ok=True)
    conn = sqlite3.connect("sample_db/sample.db")
    cur = conn.cursor()
    
    # 创建客户表
    cur.execute("""CREATE TABLE IF NOT EXISTS customers (
        id INTEGER PRIMARY KEY,
        name TEXT,
        email TEXT,
        signup_date TEXT
    )""")
    
    # 创建订单表
    cur.execute("""CREATE TABLE IF NOT EXISTS orders (
        id INTEGER PRIMARY KEY,
        customer_id INTEGER,
        total_amount REAL,
        status TEXT,
        created_at TEXT,
        FOREIGN KEY(customer_id) REFERENCES customers(id)
    )""")
    
    # 插入示例数据
    customers = [
        (1, "Alice Johnson", "alice@example.com", "2024-12-01"),
        (2, "Bob Lee", "bob@example.com", "2024-12-05"),
        (3, "Carol Singh", "carol@example.com", "2024-12-10"),
        (4, "David Kim", "david.kim@example.com", "2024-12-12")
    ]
    
    orders = [
        (1, 1, 120.50, "completed", "2025-01-03", "First order"),
        (2, 1, 15.00, "pending", "2025-01-07", "Gift wrap"),
        (3, 2, 250.00, "completed", "2025-02-10", "Bulk order"),
        (4, 3, 75.25, "completed", "2025-02-15", "Regular order")
    ]
    
    cur.executemany("INSERT OR REPLACE INTO customers VALUES (?,?,?,?)", customers)
    cur.executemany("INSERT OR REPLACE INTO orders VALUES (?,?,?,?,?,?)", orders)
    
    conn.commit()
    conn.close()
    print("Sample database created successfully!")

if __name__ == "__main__":
    create_sample_db()

3. 向量索引构建

# ingestion/index_sqlite.py
import sqlite3
import os
from tqdm import tqdm
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma

def row_hash(values):
    """为行生成唯一哈希值"""
    import hashlib
    return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()

def row_to_text(table, cols, row):
    """将 SQLite 行记录转换为可读文本块"""
    return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])

def index_table(conn, table):
    """将表索引进向量库"""
    cur = conn.cursor()
    cur.execute(f"PRAGMA table_info({table})")
    cols = [c[1] for c in cur.fetchall()]
    
    cur.execute(f"SELECT {','.join(cols)} FROM {table}")
    rows = cur.fetchall()
    
    embeddings = HuggingFaceEmbeddings(
        model_name="sentence-transformers/all-MiniLM-L6-v2"
    )
    
    vectorstore = Chroma(
        collection_name="sqlite_docs",
        persist_directory="./chroma_persist",
        embedding_function=embeddings
    )
    
    docs, ids, metas = [], [], []
    for r in rows:
        txt = row_to_text(table, cols, r)
        pk = str(r[0])
        hid = row_hash(r)
        
        ids.append(f"{table}:{pk}")
        docs.append(txt)
        metas.append({"table": table, "pk": pk, "hash": hid})
    
    vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)
    print(f"Indexed table: {table}")

def main():
    """主索引流程"""
    SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
    conn = sqlite3.connect(SQLITE_PATH)
    
    cur = conn.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [t[0] for t in cur.fetchall()]
    
    for table in tqdm(tables, desc="Indexing tables"):
        index_table(conn, table)
    
    conn.close()
    print("Indexing completed and persisted to Chroma!")

if __name__ == "__main__":
    main()

4. LangChain Agent 实现

# server/langgraph_nodes.py
import re
from langchain_core.runnables import RunnableLambda
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings

# 初始化检索器
embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"
)
vectorstore = Chroma(
    collection_name="sqlite_docs",
    persist_directory="./chroma_persist",
    embedding_function=embeddings
)
retriever = vectorstore.as_retriever()

async def retriever_node(state):
    """检索相关表结构和数据"""
    docs = await retriever.ainvoke(state["question"])
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state

def build_sql_prompt():
    """构建 SQL 生成提示词"""
    from langchain_core.prompts import PromptTemplate
    return PromptTemplate.from_template(
        """You are a SQL generator. Based on the context below, generate a single read-only SQLite SELECT query (no semicolon, no multiple statements).

Context:
{context}

Question:
{question}

Only return the SELECT SQL statement."""
    )

def clean_llm_sql_output(text):
    """清洗 LLM 输出,仅保留 SELECT 语句"""
    out = str(text or "").strip()
    # 移除 Markdown 代码围栏
    out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.IGNORECASE).replace("```", "").strip()
    # 提取 SELECT 语句
    match = re.search(r"(select\b.*)", out, flags=re.IGNORECASE | re.DOTALL)
    if match:
        out = match.group(1).rstrip(";").strip()
    return out

async def sql_generator_node(state):
    """生成 SQL 查询"""
    from langchain_openai import ChatOpenAI
    
    llm = ChatOpenAI(model="gpt-4o")
    prompt = build_sql_prompt()
    
    context = "\n\n".join(state.get("retrieved_docs", []))
    prompt_text = prompt.format(context=context, question=state["question"])
    
    out = await llm.ainvoke(prompt_text)
    sql = clean_llm_sql_output(out.content)
    
    state["generated_sql"] = sql
    return state

5. SQL 验证与执行

# server/sql_validator.py
import sqlglot

def validate_sql(sql, allowed_tables):
    """验证 SQL 安全性"""
    ALLOWED_STATEMENTS = {"select"}
    DISALLOWED = {"delete", "update", "insert", "drop", "alter"}
    
    # 检查分号和多语句
    if ";" in sql:
        return False, "不允许分号或多语句"
    
    # 检查禁用关键字
    for kw in DISALLOWED:
        if f"{kw}" in sql.lower():
            return False, f"不允许的关键字: {kw}"
    
    # 解析 SQL
    try:
        parsed = sqlglot.parse_one(sql, read="sqlite")
    except Exception as e:
        return False, f"SQL 解析错误: {e}"
    
    # 检查语句类型
    if parsed.key.lower() not in ALLOWED_STATEMENTS:
        return False, "只允许 SELECT 语句"
    
    # 检查表白名单
    def extract_tables(s):
        return set(re.findall(r"\bfrom\s+([a-zA-Z0-9_]+)", s, flags=re.IGNORECASE))
    
    tables = extract_tables(sql)
    if not tables.issubset(set(allowed_tables)):
        return False, f"使用了不在白名单的表: {tables - set(allowed_tables)}"
    
    return True, "ok"
# server/executor.py
import sqlite3

def execute_sql(sql, row_limit=1000):
    """执行 SQL 查询"""
    def enforce_limit(q, limit):
        q_low = q.lower()
        if " limit " in q_low:
            return q
        return f"{q}LIMIT {int(limit)}"
    
    def open_ro_conn():
        conn = sqlite3.connect("sample_db/sample.db")
        conn.execute("PRAGMA query_only = 1")  # 只读模式
        return conn
    
    sql = enforce_limit(sql, row_limit)
    
    conn = open_ro_conn()
    conn.execute("PRAGMA busy_timeout = 5000")  # 忙等待
    
    cur = conn.cursor()
    cur.execute(sql)
    
    cols = [c[0] for c in cur.description] if cur.description else []
    rows = cur.fetchmany(row_limit)
    
    conn.close()
    return cols, rows

6. FastAPI 后端服务

# server/main.py
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv

from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql

load_dotenv()

app = FastAPI(title="RAG Text-to-SQL API")

class QueryRequest(BaseModel):
    question: str
    show_sql: bool = True

@app.post("/query")
async def query(req: QueryRequest):
    """运行完整 RAG→SQL→验证→执行流程"""
    
    # 1. 检索相关文档
    state = {"question": req.question, "messages": []}
    state = await retriever_node(state)
    
    # 2. 生成 SQL
    state = await sql_generator_node(state)
    sql = state["generated_sql"]
    
    # 3. 验证 SQL
    allowed_tables = ["customers", "orders"]
    ok, reason = validate_sql(sql, allowed_tables)
    if not ok:
        raise HTTPException(status_code=400, detail=f"SQL 验证失败: {reason}. SQL: {sql}")
    
    # 4. 执行查询
    cols, rows = execute_sql(sql)
    result = [dict(zip(cols, r)) for r in rows]
    
    return {
        "sql": sql if req.show_sql else None,
        "cols": cols,
        "rows": result
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.app.run(app, host="0.0.0.0", port=8000)

7. Streamlit 前端界面

# app.py
import streamlit as st
import requests

API_URL = "http://localhost:8000/query"

st.set_page_config(page_title="RAG Text-to-SQL Demo", layout="centered")

st.title("🤖 RAG Text-to-SQL 智能分析系统")

with st.form("query_form"):
    question = st.text_input(
        "输入自然语言问题",
        value="显示上个月注册的所有客户"
    )
    show_sql = st.checkbox("显示生成的 SQL", value=True)
    submitted = st.form_submit_button("🚀 查询")
    
    if submitted:
        if not question.strip():
            st.warning("请输入问题")
        else:
            with st.spinner("查询中..."):
                try:
                    resp = requests.post(
                        API_URL,
                        json={"question": question, "show_sql": show_sql},
                        timeout=60
                    )
                    resp.raise_for_status()
                    data = resp.json()
                    
                    if show_sql:
                        st.subheader("🧠 生成的 SQL")
                        st.code(data.get("sql", ""), language="sql")
                    
                    st.subheader("📊 查询结果")
                    rows = data.get("rows", [])
                    if rows:
                        st.dataframe(rows)
                    else:
                        st.info("该查询未返回任何行")
                        
                except requests.exceptions.HTTPError as http_err:
                    st.error(f"HTTP 错误: {http_err} - {resp.text}")
                except requests.exceptions.ConnectionError:
                    st.error("无法连接 API。请确认 FastAPI 在 localhost:8000 运行中。")
                except requests.exceptions.Timeout:
                    st.error("请求超时,请稍后再试。")
                except Exception as e:
                    st.error(f"意外错误: {e}")

8. 运行与部署

# 1. 创建数据库
python sample_db/create_sample_db.py

# 2. 构建向量索引
python ingestion/index_sqlite.py

# 3. 启动 FastAPI 后端
uvicorn server.main:app --reload --host 0.0.0.0 --port 8000

# 4. 启动 Streamlit 前端(在新终端)
streamlit run app.py --server.port 8501
4.1.3 系统特性与优化

1. 安全特性

  • 只读 SQL 查询(禁止写操作)
  • 表白名单验证
  • SQL 注入防护(通过 sqlglot 解析)

2. 性能优化

  • 向量检索加速查询相关表结构
  • SQL 结果集限制(防止大量数据返回)
  • 异步处理提高并发能力

3. 扩展方向

  • 增量索引更新(监听数据库变化)
  • 查询缓存(Redis 缓存热点查询)
  • 多数据源支持(PostgreSQL、MySQL 等)
  • 用户权限管理(不同用户不同表权限)

4.2 案例 2:多模态 RAG 智能问答系统

场景描述: 构建一个支持文本、图像、音频和 PDF 多模态文档的 RAG 系统,实现智能问答和知识检索。

4.2.1 系统架构
┌─────────────────────────────────────────────┐
│              Web 前端(React/Vue)             │
├─────────────────────────────────────────────┤
│              API 网关(Nginx)                │
├─────────────────────────────────────────────┤
│              LangChain Agent                  │
│  ┌──────────────────────────────────────┐   │
│  │  多模态文档解析器                    │   │
│  │  ┌────────┐ ┌────────┐ ┌────────┐  │   │
│  │  │文本解析│ │图像解析│ │音频解析│  │   │
│  │  └────────┘ └────────┘ └────────┘  │   │
│  │  向量检索(FAISS/Qdrant)            │   │
│  │  生成回答(LLM + RAG)              │   │
│  └──────────────────────────────────────┘   │
├─────────────────────────────────────────────┤
│        文件存储(S3/OSS) + 元数据数据库       │
└─────────────────────────────────────────────┘
4.2.2 核心实现代码

1. 多模态文档解析器

# processors/multimodal_processor.py
from typing import List, Dict, Any
import pytesseract
from PIL import Image
import pdf2image
import speech_recognition as sr
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter

class MultiModalProcessor:
    """多模态文档处理器"""
    
    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
    
    def process_text(self, text: str, metadata: Dict[str, Any] = None) -> List[Document]:
        """处理文本文档"""
        docs = self.text_splitter.split_text(text)
        return [Document(page_content=doc, metadata=metadata or {}) for doc in docs]
    
    def process_image(self, image_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
        """处理图像文档(OCR)"""
        try:
            image = Image.open(image_path)
            text = pytesseract.pytesseract.image_to_string(image)
            docs = self.text_splitter.split_text(text)
            return [Document(page_content=doc, metadata={**(metadata or {}), "source": image_path}) for doc in docs]
        except Exception as e:
            print(f"Error processing image {image_path}: {e}")
            return []
    
    def process_audio(self, audio_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
        """处理音频文档(语音识别)"""
        try:
            r = sr.Recognizer()
            with sr.AudioFile(audio_path) as source:
                audio = r.record(source)
            text = r.recognize_google(audio, language='zh-CN')
            docs = self.text_splitter.split_text(text)
            return [Document(page_content=doc, metadata={**(metadata or {}), "source": audio_path}) for doc in docs]
        except Exception as e:
            print(f"Error processing audio {audio_path}: {e}")
            return []
    
    def process_pdf(self, pdf_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
        """处理 PDF 文档"""
        try:
            images = pdf2image.convertfrompath(pdf_path)
            docs = []
            for i, image in enumerate(images):
                text = pytesseract.pytesseract.image_to_string(image)
                docs.extend(self.text_splitter.split_text(text))
            return [Document(page_content=doc, metadata={**(metadata or {}), "source": pdf_path}) for doc in docs]
        except Exception as e:
            print(f"Error processing PDF {pdf_path}: {e}")
            return []

2. 向量检索与存储

# retrieval/vector_store.py
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from typing import List, Dict, Any

class VectorStoreManager:
    """向量存储管理器"""
    
    def __init__(self):
        self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
        self.vectorstore = None
    
    def add_documents(self, documents: List[Document]):
        """添加文档到向量库"""
        if self.vectorstore is None:
            self.vectorstore = FAISS.from_documents(
                documents,
                self.embeddings
            )
        else:
            self.vectorstore.add_documents(documents)
    
    def similarity_search(self, query: str, k: int = 4) -> List[Document]:
        """相似性检索"""
        if self.vectorstore is None:
            return []
        return self.vectorstore.similarity_search(query, k=k)
    
    def save(self, path: str):
        """保存向量库"""
        if self.vectorstore:
            self.vectorstore.save_local(path)
    
    def load(self, path: str):
        """加载向量库"""
        self.vectorstore = FAISS.load_local(path, self.embeddings)

3. LangChain Agent 实现

# agents/multimodal_agent.py
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate

class MultiModalRAGAgent:
    """多模态 RAG Agent"""
    
    def __init__(self, vector_store):
        self.vector_store = vector_store
        self.llm = ChatOpenAI(model="gpt-4o")
        self.agent = self._create_agent()
    
    def _create_agent(self):
        """创建 Agent"""
        prompt = PromptTemplate.from_messages([
            ("system", "你是一个智能助手,基于多模态文档库回答用户问题。请基于检索到的上下文准确回答。"),
            ("human", "{question}")
        ])
        
        return create_agent(
            model=self.llm,
            tools=[self._retrieval_tool],
            system_prompt="You are a helpful assistant that answers questions based on retrieved context.",
            middleware=[
                self._context_injection_middleware
            ]
        )
    
    async def _retrieval_tool(self, query: str) -> str:
        """检索工具"""
        docs = self.vector_store.similarity_search(query, k=5)
        context = "\n\n".join([doc.page_content for doc in docs])
        return context
    
    def _context_injection_middleware(self, request, handler):
        """上下文注入中间件"""
        # 检索相关文档
        docs = self.vector_store.similarity_search(request.state["question"], k=5)
        context = "\n\n".join([doc.page_content for doc in docs])
        
        # 将上下文注入到 state 中
        request.state["retrieved_context"] = context
        return handler(request)
    
    def ask(self, question: str) -> str:
        """提问"""
        return self.agent.invoke({
            "messages": [{"role": "user", "content": question}]
        })

4. 完整系统组装

# main.py
import os
from processors.multimodal_processor import MultiModalProcessor
from retrieval.vector_store import VectorStoreManager
from agents.multimodal_agent import MultiModalRAGAgent

def main():
    # 初始化处理器
    processor = MultiModalProcessor()
    
    # 处理文档
    all_documents = []
    
    # 处理文本文件
    text_files = ["docs/intro.txt", "docs/guide.md"]
    for file_path in text_files:
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
            docs = processor.process_text(text, metadata={"source": file_path})
            all_documents.extend(docs)
    
    # 处理图像文件
    image_files = ["docs/screenshot1.png", "docs/diagram.jpg"]
    for file_path in image_files:
        docs = processor.process_image(file_path, metadata={"source": file_path})
        all_documents.extend(docs)
    
    # 处理 PDF 文件
    pdf_files = ["docs/manual.pdf", "docs/report.pdf"]
    for file_path in pdf_files:
        docs = processor.process_pdf(file_path, metadata={"source": file_path})
        all_documents.extend(docs)
    
    # 初始化向量库
    vector_store = VectorStoreManager()
    vector_store.add_documents(all_documents)
    vector_store.save("./vector_store")
    
    # 创建 Agent
    agent = MultiModalRAGAgent(vector_store)
    
    # 交互式问答
    print("🤖 多模态 RAG 系统已就绪,可以开始提问了!")
    while True:
        question = input("👤 你: ")
        if question.lower() in ["quit", "exit"]:
            break
        response = agent.ask(question)
        print(f"🤖 助手: {response}")

if __name__ == "__main__":
    main()
4.2.3 系统特性

1. 多模态支持

  • 文本、图像、音频、 PDF 统一处理
  • OCR 识别提取图像和 PDF 中的文字
  • 语音识别转换音频为文字

2. 智能检索

  • 语义相似度检索
  • 支持混合检索(关键词 + 语义)
  • 检索结果排序和过滤

3. 生产级特性

  • 文档增量更新
  • 查询缓存
  • 日志记录和监控

4.3 案例 3:智能客服 Agent 系统

场景描述: 构建一个支持多渠道(网页、微信、邮件)的智能客服系统,能够自动回答用户问题、工单分类、智能路由。

4.3.1 系统架构
┌─────────────────────────────────────────────┐
│        多渠道接入(Web/WeChat/Email)          │
├─────────────────────────────────────────────┤
│              消息路由与标准化                  │
├─────────────────────────────────────────────┤
│              LangChain Agent                  │
│  ┌──────────────────────────────────────┐   │
│  │  意图识别(Intent Classification)   │   │
│  │  实体抽取(Entity Extraction)       │   │
│  │  知识检索(FAQ/RAG)                 │   │
│  │  工单生成(Ticket Creation)         │   │
│  └──────────────────────────────────────┘   │
├─────────────────────────────────────────────┤
│        知识库(向量库) + CRM 系统             │
└─────────────────────────────────────────────┘
4.3.2 核心实现

1. 多渠道消息接入

# channels/message_router.py
from typing import Dict, Any, Union
from dataclasses import dataclass

@dataclass
class Message:
    """消息对象"""
    id: str
    channel: str  # web, wechat, email
    user_id: str
    content: str
    metadata: Dict[str, Any] = None

class MessageRouter:
    """消息路由器"""
    
    def __init__(self):
        self.handlers = {
            "web": self._handle_web_message,
            "wechat": self._handle_wechat_message,
            "email": self._handle_email_message
        }
    
    def route(self, message: Message) -> Dict[str, Any]:
        """路由消息到对应处理器"""
        handler = self.handlers.get(message.channel)
        if not handler:
            raise ValueError(f"Unsupported channel: {message.channel}")
        
        return handler(message)
    
    def _handle_web_message(self, message: Message) -> Dict[str, Any]:
        """处理网页消息"""
        return {
            "user_id": message.user_id,
            "content": message.content,
            "channel": "web",
            "metadata": message.metadata
        }
    
    def _handle_wechat_message(self, message: Message) -> Dict[str, Any]:
        """处理微信消息"""
        # 微信消息特殊处理
        content = self._clean_wechat_content(message.content)
        return {
            "user_id": message.user_id,
            "content": content,
            "channel": "wechat",
            "metadata": message.metadata
        }
    
    def _handle_email_message(self, message: Message) -> Dict[str, Any]:
        """处理邮件消息"""
        # 邮件解析
        subject = message.metadata.get("subject", "")
        body = message.content
        
        return {
            "user_id": message.user_id,
            "content": f"Subject: {subject}\n{body}",
            "channel": "email",
            "metadata": message.metadata
        }

2. 意图识别与实体抽取

# agents/intent_agent.py
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List, Dict, Any

class IntentResult(BaseModel):
    """意图识别结果"""
    intent: str = Field(description="意图类别")
    confidence: float = Field(description="置信度")
    entities: Dict[str, Any] = Field(description="实体信息")

class IntentAgent:
    """意图识别 Agent"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o")
        self.agent = self._create_agent()
    
    def _create_agent(self):
        """创建意图识别 Agent"""
        from langchain.tools import tool
        
        @tool
        def classify_intent(query: str) -> str:
            """分类用户意图"""
            # 意图类别:咨询、投诉、建议、技术支持、订单查询等
            return self.llm.invoke(f"Classify the user's intent into one of: [inquiry, complaint, suggestion, technical_support, order_inquiry]. User query: {query}")
        
        @tool
        def extract_entities(query: str, intent: str) -> Dict[str, Any]:
            """抽取实体信息"""
            # 根据不同意图抽取不同实体
            if intent == "order_inquiry":
                return self.llm.invoke(f"Extract order number and product name from: {query}")
            elif intent == "technical_support":
                return self.llm.invoke(f"Extract issue description and error message from: {query}")
            else:
                return {}
        
        return create_agent(
            model=self.llm,
            tools=[classify_intent, extract_entities],
            system_prompt="You are an intent classification and entity extraction assistant."
        )
    
    def process(self, query: str) -> IntentResult:
        """处理用户查询"""
        # 调用 Agent 识别意图和抽取实体
        result = self.agent.invoke({"messages": [{"role": "user", "content": query}]})
        
        return IntentResult(
            intent=result["intent"],
            confidence=result["confidence"],
            entities=result["entities"]
        )

3. 知识检索与回答生成

# agents/knowledge_agent.py
from langchain.agents import create_agent
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings, ChatOpenAI

class KnowledgeAgent:
    """知识检索 Agent"""
    
    def __init__(self, knowledge_base_path: str = "./knowledge_base"):
        self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
        self.vectorstore = FAISS.load_local(knowledge_base_path, self.embeddings)
        self.llm = ChatOpenAI(model="gpt-4o")
        self.agent = self._create_agent()
    
    def _create_agent(self):
        """创建知识检索 Agent"""
            from langchain.tools import tool
            
            @tool
            def search_knowledge(query: str) -> str:
                """检索知识库"""
                docs = self.vectorstore.similarity_search(query, k=5)
                context = "\n\n".join([doc.page_content for doc in docs])
                return context
            
            return create_agent(
                model=self.llm,
                tools=[search_knowledge],
                system_prompt="You are a helpful customer service assistant. Answer questions based on the retrieved knowledge base."
            )
    
    def answer_question(self, query: str, intent: str, entities: Dict[str, Any]) -> str:
        """回答用户问题"""
        # 根据意图和实体定制回答
        context = f"Intent: {intent}\nEntities: {entities}\nQuery: {query}"
        
        response = self.agent.invoke({
            "messages": [{"role": "user", "content": context}]
        })
        
        return response

4. 工单管理系统

# systems/ticket_system.py
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime

@dataclass
class Ticket:
    """工单对象"""
    id: str
    user_id: str
    channel: str
    intent: str
    content: str
    status: str  # open, in_progress, resolved, closed
    priority: str  # low, medium, high, urgent
    created_at: datetime
    updated_at: datetime
    assigned_to: str = None
    metadata: Dict[str, Any] = None

class TicketSystem:
    """工单管理系统"""
    
    def __init__(self):
        self.tickets: Dict[str, Ticket] = {}
        self.ticket_counter = 1
    
    def create_ticket(self, user_id: str, channel: str, intent: str, content: str, metadata: Dict[str, Any] = None) -> Ticket:
        """创建工单"""
        ticket_id = f"TKT-{self.ticket_counter:06d}"
        self.ticket_counter += 1
        
        ticket = Ticket(
            id=ticket_id,
            user_id=user_id,
            channel=channel,
            intent=intent,
            content=content,
            status="open",
            priority=self._calculate_priority(intent),
            created_at=datetime.now(),
            updated_at=datetime.now(),
            metadata=metadata or {}
        )
        
        self.tickets[ticket_id] = ticket
        return ticket
    
    def _calculate_priority(self, intent: str) -> str:
        """根据意图计算优先级"""
        priority_map = {
            "complaint": "high",
            "technical_support": "medium",
            "order_inquiry": "low",
            "suggestion": "low"
        }
        return priority_map.get(intent, "medium")
    
    def update_ticket(self, ticket_id: str, **kwargs) -> Ticket:
        """更新工单"""
        ticket = self.tickets.get(ticket_id)
        if not ticket:
            raise ValueError(f"Ticket not found: {ticket_id}")
        
        for key, value in kwargs.items():
            if hasattr(ticket, key):
                setattr(ticket, key, value)
        
        ticket.updated_at = datetime.now()
        return ticket
    
    def assign_ticket(self, ticket_id: str, agent_id: str):
        """分配工单"""
        self.update_ticket(ticket_id, assigned_to=agent_id, status="in_progress")
    
    def resolve_ticket(self, ticket_id: str):
        """解决工单"""
        self.update_ticket(ticket_id, status="resolved")
    
    def close_ticket(self, ticket_id: str):
        """关闭工单"""
        self.update_ticket(ticket_id, status="closed")
    
    def get_tickets_by_user(self, user_id: str) -> List[Ticket]:
        """获取用户的所有工单"""
        return [t for t in self.tickets.values() if t.user_id == user_id]
    
    def get_open_tickets(self) -> List[Ticket]:
        """获取所有打开的工单"""
        return [t for t in self.tickets.values() if t.status in ["open", "in_progress"]]

5. 完整系统组装

# main.py
import asyncio
from channels.message_router import MessageRouter, Message
from agents.intent_agent import IntentAgent
from agents.knowledge_agent import KnowledgeAgent
from systems.ticket_system import TicketSystem

class CustomerServiceSystem:
    """智能客服系统"""
    
    def __init__(self):
        self.message_router = MessageRouter()
        self.intent_agent = IntentAgent()
        self.knowledge_agent = KnowledgeAgent()
        self.ticket_system = TicketSystem()
    
    async def handle_message(self, message: Message):
        """处理用户消息"""
        # 1. 路由和标准化消息
        routed_message = self.message_router.route(message)
        
        # 2. 意图识别和实体抽取
        intent_result = self.intent_agent.process(routed_message["content"])
        
        # 3. 生成回答
        response = self.knowledge_agent.answer_question(
            routed_message["content"],
            intent_result.intent,
            intent_result.entities
        )
        
        # 4. 如果是复杂问题,创建工单
        if intent_result.intent in ["complaint", "technical_support"] and intent_result.confidence > 0.8:
            ticket = self.ticket_system.create_ticket(
                user_id=routed_message["user_id"],
                channel=routed_message["channel"],
                intent=intent_result.intent,
                content=routed_message["content"],
                metadata=intent_result.entities
            )
            response += f"\n\n已为您创建工单 {ticket.id},我们会尽快处理。"
        
        return response

async def main():
    """主函数"""
    system = CustomerServiceSystem()
    
    # 模拟用户消息
    messages = [
        Message(id="1", channel="web", user_id="user_123", content="我的订单什么时候到货?"),
        Message(id="2", channel="wechat", user_id="user_456", content="产品无法使用,需要帮助"),
        Message(id="3", channel="email", user_id="user_789", content="建议增加新功能")
    ]
    
    for msg in messages:
        response = await system.handle_message(msg)
        print(f"用户: {msg.content}")
        print(f"助手: {response}")
        print("-" * 50)

if __name__ == "__main__":
    asyncio.run(main())
4.3.3 系统特性

1. 多渠道接入

  • 统一消息接口
  • 不同渠道消息格式标准化
  • 渠道特定处理逻辑

2. 智能意图识别

  • 自动分类用户意图
  • 实体信息抽取
  • 置信度评估

3. 知识检索与问答

  • RAG 检索 FAQ 知识库
  • 上下文感知回答生成
  • 多轮对话支持

4. 工单管理

  • 自动工单创建
  • 优先级计算
  • 工单状态跟踪
  • 人工分配与处理

五、迁移指南:从 0.x 到 1.0

5.1 核心变更对照表

变更点 0.x 版本 1.0 版本 迁移建议
Agent 创建 create_react_agent() create_agent() 函数名变更,参数调整 [4†]
导入路径 @langchain/langgraph/prebuilts langchain 导入路径变更
系统提示 prompt 参数 system_prompt 参数 参数名变更
模型绑定 model.bind_tools() 不支持预绑定 传递工具列表给 create_agent() [9†]
状态管理 Pydantic 模型 TypedDict 使用 TypedDict 定义状态 [9†]
结构化输出 response_format=Schema response_format=ToolStrategy(Schema) 使用策略模式 [9†]
中间件 不支持 全面支持 迁移到中间件机制
动态提示 不支持 @dynamic_prompt 装饰器 使用中间件实现动态提示

5.2 迁移步骤与最佳实践

步骤 1:更新依赖包

pip install --upgrade langchain==1.0.0 \
    langchain-openai==1.0.0 \
    langchain-community==1.0.0

步骤 2:更新 Agent 创建代码

# 0.x 版本
from langchain.agents import create_react_agent

agent = create_react_agent(
    model=model,
    tools=tools,
    prompt="You are a helpful assistant."  # prompt 参数
)

# 1.0 版本
from langchain.agents import create_agent

agent = create_agent(
    model=model,
    tools=tools,
    system_prompt="You are a helpful assistant."  # system_prompt 参数
)

步骤 3:迁移中间件逻辑

# 0.x 版本:使用函数钩子
def before_model_hook(state):
    # 自定义逻辑
    pass

agent = create_react_agent(
    model=model,
    tools=tools,
    prompt="You are a helpful assistant.",
    callbacks=[before_model_hook]  # 使用回调
)

# 1.0 版本:使用中间件
from langchain.agents.middleware import AgentMiddleware

class CustomMiddleware(AgentMiddleware):
    def before_model(self, state, runtime):
        # 自定义逻辑
        return None

agent = create_agent(
    model=model,
    tools=tools,
    system_prompt="You are a helpful assistant.",
    middleware=[CustomMiddleware()]  # 使用中间件
)

步骤 4:更新状态管理

# 0.x 版本:使用 Pydantic 模型
from pydantic import BaseModel

class CustomState(BaseModel):
    messages: list
    user_data: dict

# 1.0 版本:使用 TypedDict
from typing import TypedDict

class CustomState(TypedDict):
    messages: list
    user_data: dict

步骤 5:处理工具调用变更

# 0.x 版本:预绑定工具
model = ChatOpenAI(model="gpt-4o")
model = model.bind_tools([search_tool, get_weather])
agent = create_agent(model=model, tools=[])

# 1.0 版本:传递工具列表
agent = create_agent(
    model="gpt-4o",  # 传递模型标识符或实例
    tools=[search_tool, get_weather]  # 传递工具列表
)

5.3 兼容性注意事项

  1. Python 版本要求: LangChain 1.0 要求 Python 3.10 或更高版本 [13†]
  2. 删除的 API: 某些 0.x 版本的 API 已被移除,如 prompted output [9†]
  3. 默认行为变更: 如 create_agent 不再接受预绑定工具,需要传递工具列表 [9†]
  4. 中间件优先级: 中间件执行顺序影响行为,需要仔细测试

六、生产级部署与运维

6.1 系统部署架构

┌─────────────────────────────────────────────┐
│              负载均衡器(Nginx)               │
├─────────────────────────────────────────────┤
│              API 网关(Kong/APISIX)           │
├─────────────────────────────────────────────┤
│        LangChain Agent 服务(多实例)          │
│  ┌────────┐ ┌────────┐ ┌────────┐          │
│  │Agent 1 │ │Agent 2 │ │Agent N │          │
│  └────────┘ └────────┘ └────────┘          │
├─────────────────────────────────────────────┤
│              消息队列(Redis/RabbitMQ)        │
├─────────────────────────────────────────────┤
│              数据库(PostgreSQL/MongoDB)      │
└─────────────────────────────────────────────┘

6.2 容器化部署

Dockerfile

FROM python:3.10-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "server.main:app", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yml

version: '3.8'

services:
  agent-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=${DATABASE_URL}
    depends_on:
      - redis
      - postgres
    restart: unless-stopped
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
  
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=langchain_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  redis_data:
  postgres_data:

6.3 监控与日志

1. 日志配置

# logging_config.py
import logging
import sys
from pythonjsonlogger import jsonlogger

def setup_logging():
    """配置结构化日志"""
    logHandler = logging.StreamHandler(sys.stdout)
    logHandler.setFormatter(jsonlogger.JsonFormatter())
    
    logger = logging.getLogger()
    logger.addHandler(logHandler)
    logger.setLevel(logging.INFO)
    
    return logger

# 在应用中使用
from logging_config import setup_logging
logger = setup_logging()

2. 性能监控

# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# 定义指标
REQUEST_COUNT = Counter('agent_requests_total', 'Total number of requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('agent_request_duration_seconds', 'Request duration')
ACTIVE_AGENTS = Gauge('agent_active_instances', 'Number of active agent instances')

def track_request(func):
    """请求跟踪装饰器"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            return func(*args, **kwargs)
        finally:
            duration = time.time() - start_time
            REQUEST_DURATION.observe(duration)
    return wrapper

# 使用示例
@track_request
def handle_request(request):
    # 处理请求
    pass

3. 健康检查

# healthcheck.py
from fastapi import FastAPI
from typing import Dict, Any

app = FastAPI()

@app.get("/health")
async def health_check() -> Dict[str, Any]:
    """健康检查端点"""
    return {
        "status": "healthy",
        "timestamp": str(datetime.now()),
        "version": "1.0.0"
    }

@app.get("/ready")
async def readiness_check() -> Dict[str, Any]:
    """就绪检查"""
    # 检查依赖服务连接
    db_connected = check_database_connection()
    redis_connected = check_redis_connection()
    
    if db_connected and redis_connected:
        return {"status": "ready"}
    else:
        raise HTTPException(status_code=503, detail="Service not ready")

6.4 扩展性与性能优化

1. 水平扩展

  • 无状态设计: Agent 服务设计为无状态,便于水平扩展
  • 负载均衡: 使用 Nginx 或云服务商的负载均衡器分发请求
  • 自动伸缩: 根据 CPU 使用率、请求队列长度等指标自动调整实例数量

2. 缓存策略

# caching/redis_cache.py
import redis
import json
from typing import Any, Optional

class RedisCache:
    """Redis 缓存客户端"""
    
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
    
    def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        value = self.redis.get(key)
        if value:
            return json.loads(value)
        return None
    
    def set(self, key: str, value: Any, ttl: int = 3600):
        """设置缓存"""
        self.redis.setex(key, ttl, json.dumps(value))
    
    def delete(self, key: str):
        """删除缓存"""
        self.redis.delete(key)

# 使用示例
cache = RedisCache("redis://localhost:6379")

def cached_agent_invoke(agent_id: str, query: str):
    """带缓存的 Agent 调用"""
    cache_key = f"agent:{agent_id}:query:{hash(query)}"
    cached_result = cache.get(cache_key)
    
    if cached_result:
        return cached_result
    
    result = agent.invoke(query)
    cache.set(cache_key, result, ttl=1800)  # 缓存30分钟
    
    return result

3. 异步处理

# async_processing.py
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncAgentProcessor:
    """异步 Agent 处理器"""
    
    def __init__(self, agent, max_workers: int = 10):
        self.agent = agent
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def process_batch(self, queries: list) -> list:
        """批量处理查询"""
        loop = asyncio.get_event_loop()
        
        # 提交到线程池执行
        tasks = [
            loop.run_in_executor(self.executor, self.agent.invoke, q)
            for q in queries
        ]
        
        results = await asyncio.gather(*tasks)
        return results

# 使用示例
processor = AsyncAgentProcessor(agent)
results = await processor.process_batch([
    "What's the weather in SF?",
    "Search for AI news",
    "Analyze the data"
])

七、总结与展望

7.1 核心要点回顾

  1. 版本演进: LangChain 1.0 从 Chain 设计转向 Agent 优先,引入中间件机制和统一抽象,为生产级应用奠定基础 [1†]

  2. 架构革新: 静态/动态模型选择、工具调用优化、结构化输出策略等核心特性,显著提升了系统的灵活性和可控性 [0†]

  3. 中间件机制: 提供 5 大钩子函数,支持日志、监控、权限控制、错误处理等横切关注点的统一管理 [2†]

  4. 生产级特性: 状态管理、流式输出、错误处理、性能优化等企业级功能,满足生产环境的复杂需求

7.2 实践建议

  1. 渐进式迁移: 从小范围试点开始,逐步迁移核心业务到 1.0 版本,避免大规模重构风险

  2. 中间件优先: 充分利用中间件机制实现日志、监控、权限等通用功能,避免代码重复

  3. 安全第一: 严格验证工具调用、SQL 查询等外部输入,防止注入攻击和数据泄露

  4. 性能监控: 建立完善的监控体系,实时跟踪系统性能、错误率和用户体验

  5. 测试覆盖: 编写单元测试、集成测试和端到端测试,确保系统稳定性和功能正确性

7.3 未来展望

LangChain 1.0 的发布标志着 AI Agent 开发进入新阶段。未来发展方向包括:

  1. 更智能的决策: 结合强化学习、多模态融合等技术,提升 Agent 的智能水平和决策能力

  2. 更强的协作: 多 Agent 协作、Agent 与人类协同工作,形成更复杂的智能系统

  3. 更广的连接: 深度集成企业系统(ERP、CRM、OA 等),实现业务流程全自动化

  4. 更好的体验: 自然语言交互、多模态交互、个性化定制,提升用户体验

通过本指南的学习和实践,相信您能够构建出稳定、高效、可扩展的生产级 AI Agent 系统,为业务创造价值。


完整示例代码仓库: 本指南所有案例代码已整理在 GitHub 仓库中,供参考和学习使用。

引用来源: 本指南基于 LangChain 官方文档、技术博客和开源项目案例编写,确保内容的准确性和时效性。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐