langgraph-api源码分析1-创建线程
本文分析了AgentChatUI+langgraph-api服务中创建线程的代码实现。主要流程包括:1)通过HTTP接口接收创建请求;2)在Threads.put方法中处理核心逻辑,包括生成线程ID、配置元数据、设置TTL等;3)使用SQL语句将数据插入thread和thread_ttl表;4)返回包含线程ID、创建时间等信息的响应数据。代码实现了线程创建的基础功能,并预留了TTL管理和状态更新等
使用Agent Chat UI + langgraph-api服务时,发起会话都第一个请求就是创建线程,本文对于创建线程相关代码进行分析。首先时梳理主脉络,与主脉络关系不大的代码暂不分析。
1.数据模型
创建线程相关表包括thread和thread_ttl两个,目前仅包括thread表,表定义如下:

2.请求入口及数据
用户发起会话前先创建线程,入口为http://{ip:port}/threads,请求数据为:
{"metadata":{}}
3.源码分析
对应的入口函数为api/threads.py中的create_thread方法,具体源码如下:
@retry_db
async def create_thread(
request: ApiRequest,
):
payload = await request.json(ThreadCreate)#从表单数据中获取thread_id,bing jiancha shifou wei UUID格式
if thread_id := payload.get("thread_id"):
validate_uuid(thread_id, "Invalid thread ID: must be a UUID")
async with connect() as conn:
thread_id = thread_id or str(uuid4())#直接使用表单中的thread_id或重新生成一个新的
iter = await Threads.put(#完成线程创建
conn,
thread_id,
metadata=payload.get("metadata"),
if_exists=payload.get("if_exists") or "raise",
ttl=payload.get("ttl"),
)
config = {#短期记忆支持。对应stream调用时的configurable,暂不考虑
"configurable": {
**get_configurable_headers(request.headers),
"thread_id": thread_id,
}
}
if supersteps := payload.get("supersteps"):#暂不分析
try:
await Threads.State.bulk(
conn,
config=config,
supersteps=supersteps,
)
except HTTPException as e:
detail = f"Thread {thread_id} was created, but there were problems updating the state: {e.detail}"
raise HTTPException(status_code=201, detail=detail) from ereturn ApiResponse(await fetchone(iter, not_found_code=409)) #返回线程数据
主要逻辑在langgraph_runtime_postgres/ops.py文件的Threads.put方法,具体代码如下:
@staticmethod
async def put(
conn: AsyncConnection[DictRow],
thread_id: UUID,
*,
metadata: MetadataInput,
if_exists: OnConflictBehavior,
ttl: ThreadTTLConfig | None = None,
ctx: Auth.types.BaseAuthContext | None = None,
) -> AsyncIterator[Thread]:
metadata = metadata if metadata is not None else {}
filters = await Threads.handle_event(#获取过滤器,一般情况下为空
ctx,
"create",
Auth.types.ThreadsCreate(
thread_id=thread_id, metadata=metadata, if_exists=if_exists
),
)#ttl相关暂不考虑
ttl_config = ttl if ttl is not None else THREAD_TTL
ttl_interval_minutes = None
ttl_strategy = Noneif ttl_config:
if (
ttl_strategy := ttl_config.get("strategy", "delete")
) and ttl_strategy != "delete":
raise HTTPException(
status_code=422,
detail=f"Invalid thread TTL strategy: {ttl_strategy}. Expected one of ['delete']",
)
ttl_interval_minutes = ttl_config.get(
"ttl", ttl_config.get("default_ttl", None)
)
if ttl_interval_minutes is None:
ttl_strategy = None# 组织插入thread表的SQL语句
query = """WITH inserted_thread as (
INSERT INTO thread (thread_id, metadata)
values (%(thread_id)s, %(metadata)s)
ON CONFLICT (thread_id) DO NOTHING
RETURNING *
)
"""
params = {
"thread_id": thread_id,
"metadata": Jsonb(metadata),
}
# 组织插入thread_ttl表的SQL
if ttl_interval_minutes is not None:
query += """,
inserted_ttl as (
INSERT INTO thread_ttl (thread_id, strategy, ttl_minutes)
VALUES (
%(thread_id)s,
%(ttl_strategy)s,
%(ttl_interval)s
)
ON CONFLICT (thread_id, strategy) DO UPDATE SET
ttl_minutes = %(ttl_interval)s,
created_at = NOW() AT TIME ZONE 'UTC'
RETURNING *
)"""
params.update(
{
"ttl_interval": ttl_interval_minutes,
"ttl_strategy": ttl_strategy,
}
)query += """
SELECT * FROM inserted_thread
"""if if_exists == "do_nothing":
# return the row if it already exists
filter_clause, filter_params = _build_filter_query(
filters=filters, metadata_field="metadata"
)
where_clause = "WHERE thread_id = %(thread_id)s"
if filter_params:
params.update(filter_params)
where_clause += filter_clausequery += f"""
UNION ALL
SELECT * FROM thread
{where_clause}
LIMIT 1;
"""
elif if_exists == "raise":
# we'll raise downstream if there is a conflict
passcur = await conn.execute(query, params, binary=True)#把线程数据异步入库
return (row async for row in cur)#返回数据库中的线程记录
4.应答数据
前端收到的应答数据如下:
{
"thread_id": "a610dd44-99a0-4e49-8e9c-f8755e28f014",
"created_at": "2025-09-24T01:49:07.401066+00:00",
"updated_at": "2025-09-24T01:49:07.401066+00:00",
"metadata": {},
"status": "idle",
"config": {},
"values": null,
"interrupts": {},
"error": null
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)