使用Agent Chat UI + langgraph-api服务时,发起会话都第一个请求就是创建线程,本文对于创建线程相关代码进行分析。首先时梳理主脉络,与主脉络关系不大的代码暂不分析。

        1.数据模型

        创建线程相关表包括thread和thread_ttl两个,目前仅包括thread表,表定义如下:

        2.请求入口及数据

        用户发起会话前先创建线程,入口为http://{ip:port}/threads,请求数据为:

{"metadata":{}}

       3.源码分析

        对应的入口函数为api/threads.py中的create_thread方法,具体源码如下:

@retry_db
async def create_thread(
    request: ApiRequest,
):
    payload = await request.json(ThreadCreate)

    #从表单数据中获取thread_id,bing jiancha shifou wei UUID格式
    if thread_id := payload.get("thread_id"):
        validate_uuid(thread_id, "Invalid thread ID: must be a UUID")
    async with connect() as conn:
        thread_id = thread_id or str(uuid4())#直接使用表单中的thread_id或重新生成一个新的
        iter = await Threads.put(#完成线程创建
            conn,
            thread_id,
            metadata=payload.get("metadata"),
            if_exists=payload.get("if_exists") or "raise",
            ttl=payload.get("ttl"),
        )
        config = {#短期记忆支持。对应stream调用时的configurable,暂不考虑
            "configurable": {
                **get_configurable_headers(request.headers),
                "thread_id": thread_id,
            }
        }
        if supersteps := payload.get("supersteps"):#暂不分析
            try:
                await Threads.State.bulk(
                    conn,
                    config=config,
                    supersteps=supersteps,
                )
            except HTTPException as e:
                detail = f"Thread {thread_id} was created, but there were problems updating the state: {e.detail}"
                raise HTTPException(status_code=201, detail=detail) from e

    return ApiResponse(await fetchone(iter, not_found_code=409)) #返回线程数据

        主要逻辑在langgraph_runtime_postgres/ops.py文件的Threads.put方法,具体代码如下:

    @staticmethod
    async def put(
        conn: AsyncConnection[DictRow],
        thread_id: UUID,
        *,
        metadata: MetadataInput,
        if_exists: OnConflictBehavior,
        ttl: ThreadTTLConfig | None = None,
        ctx: Auth.types.BaseAuthContext | None = None,
    ) -> AsyncIterator[Thread]:
        metadata = metadata if metadata is not None else {}
        filters = await Threads.handle_event(#获取过滤器,一般情况下为空
            ctx,
            "create",
            Auth.types.ThreadsCreate(
                thread_id=thread_id, metadata=metadata, if_exists=if_exists
            ),
        )

        #ttl相关暂不考虑

        ttl_config = ttl if ttl is not None else THREAD_TTL
        ttl_interval_minutes = None
        ttl_strategy = None

        if ttl_config:
            if (
                ttl_strategy := ttl_config.get("strategy", "delete")
            ) and ttl_strategy != "delete":
                raise HTTPException(
                    status_code=422,
                    detail=f"Invalid thread TTL strategy: {ttl_strategy}. Expected one of ['delete']",
                )
            ttl_interval_minutes = ttl_config.get(
                "ttl", ttl_config.get("default_ttl", None)
            )
            if ttl_interval_minutes is None:
                ttl_strategy = None

        # 组织插入thread表的SQL语句
        query = """WITH inserted_thread as (
            INSERT INTO thread (thread_id, metadata)
            values (%(thread_id)s, %(metadata)s)
            ON CONFLICT (thread_id) DO NOTHING
            RETURNING *
        )
        """
        params = {
            "thread_id": thread_id,
            "metadata": Jsonb(metadata),
        }
        # 组织插入thread_ttl表的SQL
        if ttl_interval_minutes is not None:
            query += """,
            inserted_ttl as (
                INSERT INTO thread_ttl (thread_id, strategy, ttl_minutes)
                VALUES (
                    %(thread_id)s,
                    %(ttl_strategy)s,
                    %(ttl_interval)s
                )
                ON CONFLICT (thread_id, strategy) DO UPDATE SET
                    ttl_minutes = %(ttl_interval)s,
                    created_at = NOW() AT TIME ZONE 'UTC'
                RETURNING *
            )"""
            params.update(
                {
                    "ttl_interval": ttl_interval_minutes,
                    "ttl_strategy": ttl_strategy,
                }
            )

        query += """
        SELECT * FROM inserted_thread
        """

        if if_exists == "do_nothing":
            # return the row if it already exists
            filter_clause, filter_params = _build_filter_query(
                filters=filters, metadata_field="metadata"
            )
            where_clause = "WHERE thread_id = %(thread_id)s"
            if filter_params:
                params.update(filter_params)
                where_clause += filter_clause

            query += f"""
            UNION ALL
            SELECT * FROM thread
            {where_clause}
            LIMIT 1;
            """
        elif if_exists == "raise":
            # we'll raise downstream if there is a conflict
            pass

        cur = await conn.execute(query, params, binary=True)#把线程数据异步入库
        return (row async for row in cur)#返回数据库中的线程记录
 

        4.应答数据

        前端收到的应答数据如下:

{
    "thread_id": "a610dd44-99a0-4e49-8e9c-f8755e28f014",
    "created_at": "2025-09-24T01:49:07.401066+00:00",
    "updated_at": "2025-09-24T01:49:07.401066+00:00",
    "metadata": {},
    "status": "idle",
    "config": {},
    "values": null,
    "interrupts": {},
    "error": null
}

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐