Dify平台压测:Locust实现流式接口性能测试

在AI应用从原型走向生产落地的过程中,性能从来不是最后才考虑的问题。尤其当系统需要支撑成百上千的并发用户时,一个看似流畅的对话流程,可能在真实流量冲击下暴露出响应延迟、资源争抢甚至服务崩溃等隐患。

Dify作为一款可视化AI Agent与应用开发平台,让开发者无需深入代码即可编排复杂的RAG系统、智能体和文本生成逻辑。这种“低门槛”特性吸引了大量企业用于构建智能客服、知识问答、自动化内容生成等高并发场景。但正因如此,它的后端稳定性更需经受住严苛考验——毕竟,谁愿意自己的客服机器人在客户咨询高峰期卡成PPT?

为了验证Dify在真实业务负载下的表现,我们开展了一次完整的压力测试。目标很明确:
- 验证 8核16G 最小资源配置下的最大吞吐能力(TPS)
- 探索 16核32G 条件下的性能上限
- 构建一套可复用的流式接口压测方案,覆盖 /chat-messages 等关键SSE接口
- 找出瓶颈所在,并给出切实可行的调优建议

最终,我们不仅跑出了接近百TPS的稳定输出,还总结出一套适用于LLM应用的压力测试方法论。


为什么选择 Locust?

面对市面上琳琅满目的压测工具,我们曾犹豫过JMeter、k6、wrk……但最终选择了 Locust。原因很简单:它能原生支持流式响应。

工具 是否支持流式 编写灵活性 社区生态 上手难度
JMeter ❌(需插件) 中等
k6 高(JS)
wrk
Locust ✅(原生支持) 高(Python)

Locust的优势在于:

  • 用Python写脚本,灵活度极高:可以动态构造请求头、参数,甚至模拟真实用户的等待行为。
  • 原生支持 stream=True:轻松处理SSE分块数据,逐帧解析token返回过程。
  • Web UI实时监控:RPS、延迟分布、失败率一目了然,适合快速迭代调参。
  • 分布式架构友好:未来若需更大规模压测,可通过Master-Worker模式横向扩展。

更重要的是,我们可以借助 events.fire() 自定义上报指标,比如 TTFB(首字节时间) 和完整响应时间,在控制台中独立观察它们的变化趋势。


场景设计:不只是“你好,世界”

压测不能只测最简单的路径。我们围绕Dify三大典型业务场景构建测试矩阵:

简单ChatFlow:基础API层性能基线

仅包含“开始”与“直接回复”节点,固定输出一句话。这是评估核心API处理开销的理想起点,排除复杂逻辑干扰,专注测量框架本身的调度延迟。

复杂ChatFlow:真实Agent工作流模拟

包含条件判断、函数调用、插件执行(如天气查询)、向量检索等多个环节。所有外部依赖启用Mock模式,避免网络波动影响结果。这个场景用来检验多步骤任务调度的能力,尤其是上下文管理与异步协调机制。

文件召回场景:知识库检索性能探测

通过 /datasets/{id}/retrieve 接口发起混合搜索(hybrid_search),测试Weaviate向量数据库与全文检索组件的协同效率。上传一份PDF文档(含文字与图片),查询语义相关性较高的片段。此场景重点关注I/O性能与索引结构对响应速度的影响。


脚本实战:如何捕获流式响应的关键指标

以下是我们使用的完整Locust脚本,已整合三个主要场景:

from locust import HttpUser, TaskSet, task, between, events
import time
import json

class SimpleChatFlow(TaskSet):
    @task
    def chat_stream(self):
        url = "/v1/chat-messages"
        headers = {
            "Authorization": "Bearer app-KqXrYsNtLmJpZaWcVbQeTdUo",
            "Content-Type": "application/json"
        }
        payload = {
            "inputs": {},
            "query": "请简单介绍一下你自己",
            "response_mode": "streaming",
            "user": "load_test_user_001"
        }

        start_time = time.time()

        try:
            with self.client.post(
                url,
                json=payload,
                headers=headers,
                stream=True,
                catch_response=True,
                timeout=60
            ) as resp:
                if resp.status_code != 200:
                    resp.failure(f"Status {resp.status_code}")
                    return

                ttfb = time.time() - start_time
                print(f"[Simple] TTFB: {ttfb:.2f}s")

                chunk_count = 0
                for line in resp.iter_lines(decode_unicode=True):
                    if line.startswith("data:"):
                        chunk_count += 1
                        if chunk_count == 1:
                            events.fire(
                                "request",
                                name="TTFB",
                                response_time=int(ttfb * 1000),
                                response_length=0,
                                exception=None
                            )
                        total_time = time.time() - start_time
                        print(f"[Simple] Chunk #{chunk_count}: {line[:50]}... ({total_time:.2f}s)")

                    if '"message_end"' in line:
                        total_time_ms = int((time.time() - start_time) * 1000)
                        events.fire(
                            "request",
                            name="Full Response",
                            response_time=total_time_ms,
                            response_length=len(line),
                            exception=None
                        )
                        break
        except Exception as e:
            print(f"[Simple] Request failed: {e}")


class ComplexChatFlow(TaskSet):
    @task
    def complex_chat(self):
        url = "/v1/chat-messages"
        headers = {
            "Authorization": "Bearer app-ZxWnMpLkQjHgTrFeSaNdVcBi",
            "Content-Type": "application/json"
        }
        payload = {
            "inputs": {"dept": "IT"},
            "query": "我需要申请一台新电脑,请指导我完成流程",
            "response_mode": "streaming",
            "user": "load_test_user_002"
        }

        start_time = time.time()
        try:
            with self.client.post(
                url,
                json=payload,
                headers=headers,
                stream=True,
                catch_response=True,
                timeout=120
            ) as resp:
                if resp.status_code != 200:
                    resp.failure(f"Status {resp.status_code}")
                    return

                ttfb = time.time() - start_time
                print(f"[Complex] TTFB: {ttfb:.2f}s")

                for line in resp.iter_lines(decode_unicode=True):
                    if line.startswith("data:"):
                        data_str = line[5:].strip()
                        if data_str == "[DONE]":
                            continue
                        try:
                            data_json = json.loads(data_str)
                            event = data_json.get("event")
                            if event == "message_token":
                                print(f"[Complex] Streaming token received.")
                            elif event == "agent_thought":
                                print(f"[Complex] Agent is thinking...")
                        except:
                            pass
                    if '"message_end"' in line:
                        total_time = time.time() - start_time
                        print(f"[Complex] Full response time: {total_time:.2f}s")
                        break
        except Exception as e:
            print(f"[Complex] Request failed: {e}")


class FileRetrieval(TaskSet):
    @task
    def retrieve_file(self):
        url = "/v1/datasets/da0bcf35-abc5-4c77-8e2b-4e890b93b61c/retrieve"
        headers = {
            "Authorization": "Bearer dataset-pqrDBWoy9UILq7zbHnCkN3dY",
            "Content-Type": "application/json"
        }
        payload = {
            "query": "流程审批是什么?如果有图片,请一起返回",
            "retrieval_model": {
                "search_method": "hybrid_search",
                "reranking_enable": False,
                "score_threshold_enabled": False
            }
        }

        start_time = time.time()
        try:
            resp = self.client.post(url, json=payload, headers=headers, timeout=30)
            if resp.status_code == 200:
                ttfb = time.time() - start_time
                print(f"[Retrieve] Success, TTFB: {ttfb:.2f}s")
            else:
                print(f"[Retrieve] Failed: {resp.status_code}, {resp.text}")
        except Exception as e:
            print(f"[Retrieve] Request failed: {e}")


class ChatUser(HttpUser):
    tasks = [SimpleChatFlow, ComplexChatFlow]
    wait_time = between(1, 3)

class RetrieveUser(HttpUser):
    tasks = [FileRetrieval]
    wait_time = between(2, 5)

📝 关键点说明:
- 使用 events.fire()TTFB完整响应时间 单独上报,便于在Web UI中对比分析。
- iter_lines(decode_unicode=True) 实现逐行解析SSE流,精准捕捉第一个token到达时刻。
- 支持多个用户类并行运行不同场景任务,贴近真实混合负载。


压测执行:从50到150并发的演进

启动命令如下:

locust -f locustfile.py --host=http://your-dify-api-host.com

访问 http://localhost:8089 进入控制台,设置参数:

  • User Count: 依次测试 50 / 100 / 150 并发用户
  • Spawn Rate: 每秒启动5个用户,避免瞬间冲击
  • User Class: 切换为 ChatUserRetrieveUser
  • 持续时间: 每轮压测运行5分钟,确保进入稳态

同时开启Prometheus + Grafana监控集群资源使用情况,重点关注:

维度 监控重点
CPU 各容器CPU使用率(dify-api, dify-worker, postgres)
内存 RSS、Swap是否触发OOM
网络IO 容器间带宽占用、公网出向流量峰值
磁盘IO PostgreSQL WAL写入延迟、Weaviate段合并速度
日志 慢SQL、GC日志、连接池等待

这些数据帮助我们在性能下降时快速定位瓶颈。


8核16G调优之路:一步步逼近极限

测试环境部署于Kubernetes集群,节点规格为 8C16G,存储采用NFS共享盘。

第一次压测:初步暴露瓶颈

初始资源配置较为保守:

服务 CPU 内存(MiB)
dify-api 0.8 2048
dify-worker 0.8 1024
dify-postgres 0.5 1024
其他 均匀分配 ≤16384

压测100并发,结果如下:

Name Requests Fails Avg (ms) 95%ile RPS
/v1/chat-messages 5123 0 2345 4200 21.5

问题很明显:dify-api CPU利用率一度飙至98%,成为首要瓶颈。

第二次压测:提升API层资源

dify-api 提升至1.2核,其他不变。

结果:RPS上升至 30.8,提升约43%。证实计算资源确实不足。

第三次压测:双核+双Worker进程

进一步将 dify-api 设为 2核2GB,并设置 SERVER_WORKER_AMOUNT=2,启用多进程处理。

结果:RPS跃升至 49.5,几乎翻倍。但再往上增长乏力。

第四次压测:优化PostgreSQL配置

尝试调整数据库参数:

shared_buffers = 512MB
work_mem = 32MB
max_connections = 200
checkpoint_timeout = 10min
log_min_duration_statement = 500ms

效果微弱,RPS仅小幅提升至54.1。怀疑I/O受限。

第五次压测:升级数据库硬件

dify-postgres 改用本地SSD挂载,资源提升至 1核2GB

意外发生了:RPS反而跌至48.4。查看 pg_stat_activity 发现大量事务锁等待。

原来是高频短事务导致连接竞争加剧,原有连接池策略已不适用。

第六次压测:引入PgBouncer连接池

部署PgBouncer,配置:

pool_mode = transaction
max_client_conn = 150
default_pool_size = 50

再次压测,性能趋于稳定,确认已达当前资源配置下的极限。


8核16G最优配置与实测TPS

经过六轮迭代,得出该配置下的最佳实践:

服务 CPU 内存(MiB) 备注
dify-api 2 2048 SERVER_WORKER_AMOUNT=2
dify-postgres 0.5 1024 PgBouncer前置
dify-worker 0.8 1024 -
dify-plugin-daemon 0.8 2772 -
xinference/ollama 各0.5 各2048 -
其他 合理分配 - 总计≤8核16G

实测性能汇总

简单ChatFlow场景
  • TPS:48.4
  • 平均响应时间:1,437 ms
  • 95%响应时间:3,900 ms
复杂ChatFlow场景
  • TPS:20.7
  • 平均响应时间:5,689 ms
  • 主要耗时集中在插件调用与向量检索
文件召回场景
  • TPS:17.6
  • 平均响应时间:5,287 ms
  • Weaviate段合并频繁,影响查询效率

💡 当前瓶颈主要在 dify-api 计算能力和NFS存储I/O性能。


16核32G探索:突破百TPS大关

为进一步挖掘潜力,我们将节点升级至 16核32G,开启新一轮调优。

第一次压测:盲目扩容反致性能下滑

dify-api 提升至4核4GB,SERVER_WORKER_AMOUNT=4,却发现RPS降至22.9。

排查发现 dify-plugin-daemon 出现大量慢SQL,拖累整体表现。

第二次压测:强化数据库性能

将PostgreSQL升级至 2核4GB,优化 work_mem=64MBeffective_cache_size=8GB

RPS回升至 91.3,说明数据库已成为新瓶颈。

第三~六次压测:实例数与资源配置博弈

我们尝试多种组合:
- 多实例 vs 高配单实例
- Worker数量调整
- Weaviate独立部署

最终确定最优方案:

  • dify-api: 3实例 × (2核 / 2GB),每实例 SERVER_WORKER_AMOUNT=2
  • dify-postgres: 2核 / 3GB
  • dify-weaviate: 1核 / 2GB(独立部署)
  • xinference: 1核 / 2GB

结果令人振奋:

场景 TPS 峰值
简单ChatFlow 98.4 124
复杂ChatFlow 61.2 -
文件召回 44.3 -

🎉 成功突破百TPS!


16核32G最终推荐配置

服务 CPU 内存 参数
dify-api ×3 2×3 2048×3 SERVER_WORKER_AMOUNT=2
dify-postgres 2 3072 work_mem=64MB
dify-weaviate 1 2048 独立部署
xinference 1 2048 -
ollama 1 2048 -
其他 合理分配 - -
合计 15 ~24GB

剩余资源可用于部署Redis缓存或边缘网关。


更高TPS的进阶优化建议

若希望进一步提升吞吐能力,可考虑以下方向:

  1. 横向扩展API层:结合负载均衡实现水平扩容,彻底突破单机限制。
  2. 数据库独立部署:将PostgreSQL与Weaviate迁移到专用高性能实例,避免资源争抢。
  3. 引入Redis缓存:对高频知识库查询结果进行缓存,降低数据库压力。
  4. 优化Weaviate HNSW索引参数:调整 ef, max_connections 提升检索效率。
  5. 异步化长任务:将复杂Agent流程拆分为后台任务,前端仅返回任务ID,提高接口响应速度。
  6. CDN或边缘计算前置:对于静态资源或轻量查询,可在边缘节点缓存响应,减少核心服务负担。

⚠️ 注意:以上测试未包含实际LLM推理耗时。若接入Qwen、Llama3等模型,需根据显存需求合理分配 xinferenceollama 资源,否则将成为新的性能黑洞。


Dify的价值在于“可视化编排”,但这并不意味着我们可以忽视底层性能工程。相反,正是因为它封装了太多细节,才更需要我们主动去揭开盖子,看清每一毫秒的消耗来自何处。

本次压测不仅是对Dify的一次体检,更是为所有LLM应用开发者提供了一份通用的方法论:
- 用Locust做流式接口测试是完全可行且高效的;
- 性能优化是一个渐进过程,必须结合监控数据持续迭代;
- 架构设计比参数调优更重要——合理的服务拆分与资源隔离,往往比单纯加CPU更有意义。

如果你正在基于Dify构建企业级AI应用,不妨从一次系统的压测开始。别等到上线那天才发现,“低代码”背后藏着“高风险”。

👉 后续我们将继续分享《Dify高可用部署架构》《多租户性能隔离实践》《成本-性能平衡模型》等内容,欢迎关注。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐