1. 导包+获取Open AI API KEY

"""当然,工具调用的过程也可以手动进行实现,我们只需要进行适当的逻辑修改即可。如下示例所示:"""
from typing import Optional
from pydantic import BaseModel, Field
from langchain_core.tools import tool
import requests
import json
import os
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
# =====> Other dependencies
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage

load_dotenv()  # 自动加载当前目录下的 .env 文件
if os.getenv("OPENAI_API_KEY") is None:
    raise ValueError

llm = ChatOpenAI(model="gpt-3.5-turbo",base_url="https://api.chatanywhere.tech/v1")

2. 定义图的状态模式(使用Reducer函数)

# 定义图的状态模式
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage],operator.add]  # 这里也可以使用MessageGraph作为它的状态传递也是可以的

3. 定义三个外部工具并使用Pydantic格式化输出

3.1 定义信息搜索工具

class SearchQuery(BaseModel):
    query: str = Field(description="Questions for networking queries")

@tool(args_schema=SearchQuery)
def fetch_real_time_info(query):
    """Get real-time Internet information"""
    # 这里打印一下使用Pydantic格式化输出以后的用户问题
    print('\n Formatted query by SearchQuery class:', query)
    print('\n')
    """Get real-time Internet information"""
    url = "https://google.serper.dev/search"

    payload = json.dumps({
        "q": query,
        "num": 1,
    })

    headers = {
        'X-API-KEY': 'xxx',
        'Content-Type': 'application/json'
    }

    response = requests.post(url, headers=headers, data=payload)

    data = json.loads(response.text) # 将返回的JSON字符串转换成字典
    if 'organic' in data:
        return json.dumps(data['organic'], ensure_ascii=False) # 返回‘organic’部分的数据
    else:
        return json.dumps({"error": "No organic results found."}, ensure_ascii=False) # 返回错误信息

3.2 定义天气查询工具

class WeatherLoc(BaseModel):
    location: str = Field(description="The location name of the city")
@tool(args_schema=WeatherLoc)
def get_weather(location):
    """Call to get the current weather."""
    # 这里打印一下使用Pydantic格式化输出以后的用户问题
    print('\n Formatted query by WeatherLocation class:', location)
    print('\n')

    api_key = "xxx"
    url = f"http://api.weatherapi.com/v1/current.json?key={api_key}&q={location}&lang=zh"

    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        return f"{location}的温度是{data['current']['temp_c']}度,天气{data['current']['condition']['text']}。"
    else:
        return "不好意思,并未查询到具体的天气信息。"

class IPQueryRequest(BaseModel):
    ip_address: str = Field(min_length=7,max_length=15,
                            pattern=r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$",
                            description="需要查询的IPv4的地址")
    languge: Optional[str] = Field(default="zh-CN", min_length=2,
                                   max_length=5, description="返回信息的语言代码(默认中文)")

3.3 定义查询固定IP对应地理位置的工具

@tool(args_schema=IPQueryRequest)
def ip_lookup(ip_address: str, languge: str = "zh-CN") -> str:
    """Query the geographical location based on the IP address"""
    print('\n Formatted query by IPQueryRequest class:',ip_address)
    print('\n')

    try:
        url = f"https://ipinfo.io/{ip_address}/json?token=xxx"
        response = requests.get(url)
        data = response.json()
        if languge == "zh-CN":
            return f"IP地址:{data['ip']}\n城市:{data['city']}\n地区:{data['region']}\n国家:{data['country']}"
        else:
            return f"IP Address: {data['ip']}\nCity: {data['city']}\nRegion: {data['region']}\nCountry: {data['country']}"
    except Exception as e:
        return f"查询IP地址失败:{str(e)}"

4. 定义路由节点

# 定义路由节点
def chat_with_model(state):  # 路由节点
    """Generate structured output"""
    print(state)
    print("\n---------------\n")
    messages = state["messages"]
    response = llm.invoke(messages) # 这里进行了修改(使用原始模型对原始结果进行响应)  不进行格式化的输出
    # structured_llm = llm.with_structured_output(FinalResponse)
    # response = structured_llm.invoke(messages)
    return {"messages": [response]}

5. 定义消息回复节点

def final_answer(state):
    """generate natural language responses"""
    messages = state["messages"][-1]
    return {"messages": [messages]}

6. 定义工具执行节点

def execute_function(state: AgentState):
    """Generate natural language responses"""
    tool_calls = state['messages'][-1].tool_calls
    results = []
    # 创建工具字典(名称必须与模型返回完全一致)
    tools = {
        "fetch_real_time_info": fetch_real_time_info,
        "get_weather": get_weather,
        "ip_lookup": ip_lookup
    }
    print(f"检测到工具调用请求:{[t['name'] for t in tool_calls]}")  # 调试输出
    for t in tool_calls:
        tool_name = t['name']
        if tool_name not in tools:
            result = f"无效的工具名称: {tool_name}"
        else:
            try:
                # 确保参数传递正确
                args = t['args']
                if 'args' in args:  # 处理部分模型返回的参数结构
                    args = args['args']
                result = tools[tool_name].invoke(args)
            except Exception as e:
                result = f"工具执行错误: {str(e)}"
        results.append(
            ToolMessage(
                tool_call_id=t['id'],
                name=tool_name,
                content=str(result)
            )
        )
    return {'messages': results}

5. 新增消息内容摘要节点

这里新增了一个消息内容总结节点,用于将从外部工具获取的数据进行格式化输出。

# 请你基于现在得到的信息,进行总结并生成专业的回复。请注意使用中文回复。
SYSTEM_PROMPT = """
Please summarize the information obtained so far and generate a professional response. Note, please reply in Chinese.
"""

def natural_response(state):
    """generate final language responses"""
    messages = state["messages"][-1] # 对任意一个工具调用的结果进行系统性地总结
    messages = [SystemMessage(content=SYSTEM_PROMPT)] + [HumanMessage(content=messages.content)]
    response = llm.invoke(messages)
    return {"messages": [response]}

6. 定义路由函数

下面的路由函数会判断message里面是否有tool_calls属性,如果缺少该属性则为自然语言回复,否则生成了工具内容调用的参数,需要执行外部工具调用。

# 定义一个Router Function
def exists_function_calling(state: AgentState):
    result = state['messages'][-1]
    return len(result.tool_calls) > 0  # 如果message里面没有tool_calls就是自然语言回复,否则就生成了工具调用的参数,那么就可以基于这个条件进行判断

7. 构建图结构

7.1 定义图的状态模式

# 定义图的状态模式
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage],operator.add]  # 这里也可以使用MessageGraph作为它的状态传递也是可以的

7.2 实例化图结构并添加节点


graph = StateGraph(AgentState)

graph.add_node("chat_with_model", chat_with_model)
graph.add_node("execute_function", execute_function)
graph.add_node("final_answer", final_answer)
graph.add_node("natural_response", natural_response)  # 新增一个摘要总结的节点

7.3 设置图的启动节点

# 设置图的启动节点
graph.set_entry_point("chat_with_model")

7.4 设置条件边与普通边

# 设置条件边
graph.add_conditional_edges(
    "chat_with_model",
    exists_function_calling,
    {True: "execute_function", False: "final_answer"}
)


# 因为natural_response是对两个节点进行自然语言的处理,所以将这两个节点的后面都连接到natural_response
graph.add_edge("execute_function", "natural_response")
graph.add_edge("final_answer", "natural_response")

7.5 设置图的终止节点

graph.set_finish_point("natural_response")

7.6 编译图

graph = graph.compile()

8. 绑定外部工具

tools = [fetch_real_time_info, get_weather, ip_lookup]

llm = llm.bind_tools(tools)  # 将工具绑定到大模型上

9. 测试代码

messages = [HumanMessage(content="请查询这个IP的地址位置:39.156.66.10")]
# messages = [HumanMessage(content="请你告诉我一下乌鲁木齐的天气情况。")]
# messages = [HumanMessage(content="特朗普的最新新闻。")]
result = graph.invoke({"messages": messages})
print(result["messages"][-1].content)
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐