1. 项目概述:当ChatGPT遇上CQRS

最近在设计和实现一个需要与大型语言模型(LLM)深度集成的系统时,我遇到了一个典型的架构挑战:如何优雅地处理用户与AI之间复杂的、状态化的交互流程?比如,一个用户向ChatGPT发出一个指令“帮我分析上周的销售数据并生成报告”,这背后可能涉及多个步骤——查询数据库、执行计算、生成文本、甚至调用外部API。如果所有逻辑都塞在一个庞大的服务里,代码很快就会变得难以维护,尤其是当需要区分“用户指令的解析”(查询)和“报告生成任务的执行”(命令)时。

这正是CQRS(命令查询职责分离)模式大显身手的地方。这个项目,我称之为“ChatGPT Implements Work With Users Using the CQRS Pattern”,核心就是探讨如何将CQRS的思想应用于构建与ChatGPT(或同类LLM)协作的、健壮的后端服务。它不是简单地在ChatGPT外面套一个API网关,而是从领域逻辑层面,将用户与AI的交互清晰地分解为“命令”(触发一个需要改变系统状态或执行复杂流程的动作)和“查询”(获取信息、请求解释或进行简单对话)。通过这种分离,我们能够获得更好的性能、更清晰的代码结构,以及应对复杂工作流的强大能力。无论你是在构建AI客服、智能助手后台,还是任何需要将LLM作为核心“工作者”集成到业务系统中的开发者,这套思路都能为你提供一个坚实且可扩展的架构蓝图。

2. 核心架构思路:为什么CQRS是AI集成的绝配

在深入代码之前,我们必须先理解为什么CQRS模式特别适合处理与ChatGPT的交互。传统的CRUD架构在面对LLM时常常显得力不从心,主要原因在于LLM交互的本质是异步的、有状态的,且包含明显的“意图”与“执行”的分离。

2.1 传统CRUD的瓶颈与CQRS的优势

想象一个简单的场景:用户请求“总结我的未读邮件”。在CRUD模型下,一个控制器可能同时负责:1)验证用户身份和权限(命令侧),2)调用邮件API获取数据(查询侧),3)构造Prompt发送给ChatGPT(命令侧),4)等待AI响应并返回(查询侧),5)可能还要将结果缓存或记录日志(命令侧)。这种混杂的职责使得服务难以测试、扩展,并且当AI处理耗时较长时,会阻塞整个HTTP请求线程。

CQRS通过强制性的职责分离解决了这些问题:

  • 命令(Command) :负责“做某事”,会改变系统状态。它应该是异步的、无返回值的(或仅返回一个任务ID)。在我们的上下文中,所有触发AI执行具体任务、写数据库、调用外部写操作API的请求,都是命令。例如,“生成销售报告”、“根据对话历史更新用户画像”。
  • 查询(Query) :负责“读数据”,不会改变系统状态。它应该是同步的、快速返回结果的。例如,“获取刚才那个报告生成的进度”、“列出我所有与AI的对话历史”、“向AI提出一个简单的知识性问题”。

这种分离带来了几个直接好处:

  1. 模型优化 :命令模型和查询模型可以针对各自的工作负载进行独立优化。查询模型可以极度简化,甚至直接映射到数据库的只读副本或缓存视图,以实现毫秒级响应。
  2. 复杂度管理 :将复杂的业务逻辑(尤其是涉及多步AI调用和状态转换的)隔离在命令端,保持查询端的简单与稳定。
  3. 伸缩性 :命令处理和查询处理可以独立伸缩。如果AI任务繁重,可以横向扩展命令处理器;如果查询请求量大,可以增强查询端的缓存和数据库读副本。

2.2 领域驱动设计(DDD)与CQRS的协同

CQRS常常与事件溯源(Event Sourcing)结合,但在这个项目中,我们采用更轻量级、更实用的方法:将CQRS与DDD的聚合根(Aggregate Root)和领域事件(Domain Event)结合。我们将用户与AI的一次“工作任务”(Work Session)视为一个聚合根。这个聚合根会接收各种命令(如 StartAnalysisCommand ProvideAdditionalInfoCommand ),并发布相应的领域事件(如 AnalysisStartedEvent AITaskDispatchedEvent WorkCompletedEvent )。

这些领域事件是系统的脊梁。它们不仅用于在聚合内部驱动状态变化,更重要的是,它们会被发布到消息总线(如RabbitMQ, Kafka),从而触发后续的 进程管理器(Process Manager) 或** Saga**。进程管理器是协调复杂、长期运行工作流的核心模式,它监听事件,决定下一步该发送什么命令。这正是管理多步AI交互的理想抽象。

注意 :不要一开始就引入事件溯源(Event Sourcing)。除非你有强烈的审计、时间旅行调试需求,否则事件溯源的复杂性可能会超过其收益。对于大多数AI集成场景,用领域事件驱动流程,并用常规的数据库持久化聚合的“当前状态”,是更务实的选择。

3. 技术栈选型与核心组件设计

基于上述架构思路,我们选择了一套能够支撑高并发、异步处理的技术栈。选型的核心原则是:解耦、异步、可观测。

3.1 后端技术栈详解

  • 语言与框架 :我选择了 C# / .NET 8 ASP.NET Core 。.NET的强类型系统、优秀的异步编程模型(async/await)以及对依赖注入的原生支持,非常适合构建结构清晰的领域模型。当然,Java/Spring Boot, Node.js/NestJS, Python/FastAPI也都是绝佳选择,核心模式是通用的。
  • 命令与查询总线 :使用 MediatR 库。它是一个轻量级的进程内中介者模式实现,能完美地将命令/查询的发送与处理解耦。发送一个 ICommand IQuery ,由对应的 IRequestHandler 来处理,无需知道具体实现。
  • 持久化
    • 命令侧 :使用 Entity Framework Core Dapper 将聚合根的状态持久化到关系型数据库(如PostgreSQL, SQL Server)的“写库”。表结构围绕聚合根设计。
    • 查询侧 :使用 Dapper 或EF Core的只读上下文,连接到一个 只读副本数据库 。查询模型是面向视图的,可能是一张扁平化的表,或者是一个专门优化的查询视图。
  • 消息总线与事件处理 :使用 MassTransit Brighter 。它们建立在RabbitMQ或Azure Service Bus之上,提供了强大的消息发布/订阅、重试、死信队列等功能,用于发布领域事件和实现进程管理器。
  • 与ChatGPT集成 :使用 OpenAI .NET SDK Azure OpenAI SDK 。关键是要将其封装在 领域服务 中,而不是在处理器里直接调用。这个服务负责构造Prompt、处理Token限制、解析响应,并返回结构化的结果。
  • 缓存 Redis 。用于缓存频繁的查询结果(如常见的AI问答对)、用户会话上下文,以及作为进程管理器状态的临时存储。
  • API网关与通信 :除了标准的RESTful API用于触发命令和简单查询,强烈建议为需要实时进度更新的场景(如报告生成)提供 SignalR 支持,实现服务器向客户端的主动推送。

3.2 核心领域模型设计

让我们定义一个核心聚合根: WorkSession

public class WorkSession : AggregateRoot<Guid> // 假设AggregateRoot是一个基类
{
    public Guid Id { get; private set; }
    public string UserId { get; private set; }
    public WorkSessionStatus Status { get; private set; } // e.g., Draft, Running, WaitingForInput, Completed, Failed
    public string CurrentObjective { get; private set; } // 当前任务目标
    public List<ConversationTurn> ConversationHistory { get; private set; } // 对话历史
    public Dictionary<string, object> ContextData { get; private set; } // 上下文数据(如已收集的信息)
    public string? FinalResult { get; private set; } // 最终结果

    // 命令处理方法
    public void StartAnalysis(StartAnalysisCommand command)
    {
        if (Status != WorkSessionStatus.Draft)
            throw new InvalidOperationException("Session already started.");

        CurrentObjective = command.Objective;
        Status = WorkSessionStatus.Running;
        AddConversationTurn("user", command.UserInput);

        // 发布领域事件
        AddDomainEvent(new AnalysisStartedEvent(Id, UserId, CurrentObjective));
        AddDomainEvent(new AITaskDispatchedEvent(Id, "InitialAnalysis", ConversationHistory));
    }

    public void HandleAIResponse(AIResponseReceivedEvent @event)
    {
        AddConversationTurn("assistant", @event.ResponseContent);
        // 根据AI响应内容,可能更新状态、发布新事件
        if (@event.SuggestsNextAction == "need_more_info")
        {
            Status = WorkSessionStatus.WaitingForInput;
            AddDomainEvent(new WaitingForUserInputEvent(Id, @event.RequiredInfo));
        }
        else if (@event.SuggestsNextAction == "complete")
        {
            FinalResult = @event.ResponseContent;
            Status = WorkSessionStatus.Completed;
            AddDomainEvent(new WorkCompletedEvent(Id, FinalResult));
        }
    }
    // ... 其他命令处理方法
}

这个 WorkSession 聚合根是保证一致性的边界。所有改变其状态的操作,都必须通过它的方法(响应命令)来完成。

4. 命令端实现:驱动AI工作流引擎

命令端是整个系统的驱动者。它的职责是接收用户意图,通过聚合根验证业务规则,然后发布事件,触发后续的AI调用和流程控制。

4.1 命令处理器(Command Handler)的实现

一个典型的命令处理器,例如处理 StartAnalysisCommand

public class StartAnalysisCommandHandler : IRequestHandler<StartAnalysisCommand, Guid>
{
    private readonly IRepository<WorkSession> _sessionRepository;
    private readonly IPublishEndpoint _publishEndpoint; // MassTransit 接口

    public async Task<Guid> Handle(StartAnalysisCommand request, CancellationToken cancellationToken)
    {
        // 1. 创建或获取聚合根
        var session = WorkSession.StartNew(request.UserId, request.Objective, request.InitialInput);
        
        // 2. 持久化聚合根的新状态
        await _sessionRepository.SaveAsync(session, cancellationToken);
        
        // 3. 发布聚合根产生的所有领域事件
        foreach (var domainEvent in session.DomainEvents)
        {
            // 将领域事件转换为集成事件(可选,添加更多上下文)
            var integrationEvent = new IntegrationEventWrapper(domainEvent);
            await _publishEndpoint.Publish(integrationEvent, cancellationToken);
        }
        session.ClearDomainEvents();
        
        // 4. 返回会话ID,客户端可以用它来查询进度
        return session.Id;
    }
}

这里的关键是,处理器 不直接调用AI 。它只负责更新领域状态并发布事件。AI调用是由监听这些事件的 领域事件处理器 来触发的。

4.2 领域事件处理器与AI服务封装

接下来,我们创建一个处理器来响应 AITaskDispatchedEvent

public class AITaskDispatchedEventHandler : IConsumer<AITaskDispatchedEvent> // MassTransit的消费者接口
{
    private readonly IAIService _aiService;
    private readonly IRepository<WorkSession> _sessionRepository;
    private readonly IPublishEndpoint _publishEndpoint;

    public async Task Consume(ConsumeContext<AITaskDispatchedEvent> context)
    {
        var @event = context.Message;
        var session = await _sessionRepository.GetByIdAsync(@event.SessionId);
        
        // 调用封装的AI服务
        var aiResponse = await _aiService.ProcessTaskAsync(
            @event.TaskType,
            session.ConversationHistory,
            session.ContextData
        );
        
        // 根据AI响应,向聚合根发送一个新的“内部命令”(通过发布事件)
        var responseEvent = new AIResponseReceivedEvent(
            @event.SessionId,
            aiResponse.Content,
            aiResponse.SuggestedNextAction,
            aiResponse.RequiredParameters
        );
        
        await _publishEndpoint.Publish(responseEvent);
    }
}

IAIService 是一个领域服务,它封装了所有与OpenAI API的交互细节:

public class OpenAIService : IAIService
{
    private readonly IOpenAIClient _client;
    private readonly IPromptTemplateEngine _templateEngine;

    public async Task<AIResponse> ProcessTaskAsync(string taskType, List<ConversationTurn> history, Dictionary<string, object> context)
    {
        // 1. 根据任务类型选择Prompt模板
        var promptTemplate = GetTemplate(taskType);
        
        // 2. 使用上下文数据渲染Prompt
        var fullPrompt = _templateEngine.Render(promptTemplate, new {
            History = history,
            Context = context
        });
        
        // 3. 调用API,注意处理速率限制、超时和重试
        var chatRequest = new ChatCompletionRequest {
            Messages = BuildMessagesFromPrompt(fullPrompt),
            Model = "gpt-4",
            Temperature = 0.7,
            MaxTokens = 2000
        };
        
        var response = await _client.GetChatCompletionAsync(chatRequest);
        
        // 4. 解析响应,可能使用JSON模式(如OpenAI的function calling)来获取结构化输出
        var structuredOutput = ParseAIResponse(response.Choices.First().Message.Content);
        
        return new AIResponse {
            Content = structuredOutput.Answer,
            SuggestedNextAction = structuredOutput.NextAction,
            RequiredParameters = structuredOutput.NeededInfo
        };
    }
}

实操心得 :在Prompt模板中,明确指示AI以特定JSON格式返回,可以极大简化后续的解析逻辑。利用OpenAI的 response_format 参数或Function Calling特性,能获得更稳定、结构化的输出。

5. 查询端实现:高效的数据读取与状态展示

查询端的设计目标是快和简单。它不关心业务逻辑,只关心如何以最合适的形式把数据呈现给客户端。

5.1 查询模型与只读存储

查询端的数据模型应该完全根据前端或客户端的需要来设计。例如,一个 WorkSessionProgressView

-- 在只读副本数据库中的一个视图或表
CREATE VIEW vw_WorkSessionProgress AS
SELECT 
    ws.Id,
    ws.UserId,
    ws.Status,
    ws.CurrentObjective,
    ws.LastUpdatedAt,
    -- 计算字段,如进度百分比(这是一个简化示例,实际进度可能更复杂)
    CASE 
        WHEN ws.Status = 'Completed' THEN 100
        WHEN ws.Status = 'Running' THEN 50 -- 可能从其他表计算
        ELSE 0
    END as ProgressPercentage,
    -- 最新的AI回复摘要
    (SELECT TOP 1 Content FROM ConversationTurns WHERE SessionId = ws.Id AND Role = 'assistant' ORDER BY TurnNumber DESC) as LastAIMessage
FROM WriteDatabase.WorkSessions ws; -- 假设从写库同步过来

这个视图扁平化了 WorkSession 聚合及其相关的 ConversationTurn ,查询效率极高。

5.2 查询处理器与缓存策略

对应的查询处理器非常简单:

public class GetSessionProgressQueryHandler : IRequestHandler<GetSessionProgressQuery, WorkSessionProgressDto>
{
    private readonly IQuerySessionRepository _queryRepo;
    private readonly IDistributedCache _cache;

    public async Task<WorkSessionProgressDto> Handle(GetSessionProgressQuery request, CancellationToken ct)
    {
        var cacheKey = $"session_progress:{request.SessionId}";
        
        // 尝试从缓存读取
        var cached = await _cache.GetStringAsync(cacheKey, ct);
        if (cached != null)
        {
            return JsonSerializer.Deserialize<WorkSessionProgressDto>(cached);
        }
        
        // 缓存未命中,查询数据库
        var progress = await _queryRepo.GetProgressAsync(request.SessionId, ct);
        
        if (progress != null)
        {
            // 写入缓存,设置较短的过期时间,因为进度更新频繁
            await _cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(progress), new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(30) // 30秒后过期
            }, ct);
        }
        
        return progress;
    }
}

对于对话历史这种可能较长的数据,可以采用分页查询,并且只缓存最近的N条。

5.3 实时进度更新:SignalR集成

对于长时间运行的AI任务,轮询查询进度对用户体验不友好。我们可以使用SignalR在状态发生变化时主动推送。

在命令端,当 WorkSession 的状态发生变化并发布 WorkSessionUpdatedEvent 时,一个专门的事件处理器会捕获这个事件,并通过SignalR Hub通知连接到该会话的所有客户端。

public class WorkSessionUpdatedEventHandler : IConsumer<WorkSessionUpdatedEvent>
{
    private readonly IHubContext<WorkSessionHub> _hubContext;

    public async Task Consume(ConsumeContext<WorkSessionUpdatedEvent> context)
    {
        var @event = context.Message;
        // 通知该会话的所有客户端
        await _hubContext.Clients.Group(@event.SessionId.ToString())
                                 .SendAsync("ProgressUpdated", new { @event.SessionId, @event.NewStatus, @event.Progress });
    }
}

客户端在发起任务后,连接到Hub并加入以 SessionId 命名的组,即可实时接收更新。

6. 进程管理器:编排复杂多步AI工作流

这是整个架构中最能体现价值的部分。当用户的任务需要多个AI调用步骤,并且中间可能需要用户介入时,一个简单的“发布-订阅”事件链会变得难以管理。进程管理器(或Saga)就是用来协调这种长期运行业务流程的模式。

假设我们有一个“数据获取-分析-报告生成”的工作流:

  1. 用户请求分析销售数据。
  2. AI需要先查询数据库获取原始数据(命令1)。
  3. 拿到数据后,AI进行分析(命令2)。
  4. 分析结果需要用户确认某个指标。
  5. 用户确认后,AI生成最终报告(命令3)。

6.1 进程管理器的状态机实现

我们可以使用 状态机 来建模这个流程。这里使用MassTransit的 Automatonymous 库来定义一个 ReportGenerationSaga

public class ReportGenerationSagaState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; } // 对应 WorkSession Id
    public string CurrentState { get; set; }
    public string Objective { get; set; }
    public string RawData { get; set; }
    public string AnalysisResult { get; set; }
    public bool UserConfirmed { get; set; }
}

public class ReportGenerationSaga : MassTransitStateMachine<ReportGenerationSagaState>
{
    // 定义事件
    public Event<AnalysisStartedEvent> AnalysisStarted { get; private set; }
    public Event<DataFetchedEvent> DataFetched { get; private set; }
    public Event<AnalysisCompletedEvent> AnalysisCompleted { get; private set; }
    public Event<UserConfirmationReceivedEvent> UserConfirmationReceived { get; private set; }
    public Event<ReportGeneratedEvent> ReportGenerated { get; private set; }

    // 定义状态
    public State AwaitingDataFetch { get; private set; }
    public State AwaitingAnalysis { get; private set; }
    public State AwaitingUserConfirmation { get; private set; }
    public State AwaitingReportGeneration { get; private set; }
    public State Completed { get; private set; }

    public ReportGenerationSaga()
    {
        InstanceState(x => x.CurrentState);

        // 流程起点:分析开始事件
        Initially(
            When(AnalysisStarted)
                .Then(context => {
                    context.Instance.Objective = context.Data.Objective;
                })
                .PublishAsync(context => context.Init<FetchDataCommand>(new {
                    SessionId = context.Instance.CorrelationId,
                    // 根据Objective构造查询参数...
                }))
                .TransitionTo(AwaitingDataFetch)
        );

        // 步骤1:数据获取完成
        During(AwaitingDataFetch,
            When(DataFetched)
                .Then(context => context.Instance.RawData = context.Data.RawDataJson)
                .PublishAsync(context => context.Init<PerformAnalysisCommand>(new {
                    SessionId = context.Instance.CorrelationId,
                    RawData = context.Instance.RawData
                }))
                .TransitionTo(AwaitingAnalysis)
        );

        // 步骤2:分析完成,需要用户确认
        During(AwaitingAnalysis,
            When(AnalysisCompleted)
                .Then(context => {
                    context.Instance.AnalysisResult = context.Data.Result;
                    // 这里可以发布一个事件,通知前端需要用户确认
                })
                // 发布一个事件,让前端展示确认界面
                .PublishAsync(context => context.Init<RequestUserConfirmationEvent>(new {
                    SessionId = context.Instance.CorrelationId,
                    Question = "是否确认指标X?",
                    AnalysisResult = context.Instance.AnalysisResult
                }))
                .TransitionTo(AwaitingUserConfirmation)
        );

        // 步骤3:收到用户确认,生成报告
        During(AwaitingUserConfirmation,
            When(UserConfirmationReceived)
                .Then(context => context.Instance.UserConfirmed = context.Data.Confirmed)
                .If(context => context.Instance.UserConfirmed,
                    thenBinder => thenBinder
                        .PublishAsync(context => context.Init<GenerateReportCommand>(new {
                            SessionId = context.Instance.CorrelationId,
                            AnalysisResult = context.Instance.AnalysisResult
                        }))
                        .TransitionTo(AwaitingReportGeneration)
                )
                .Else(/* 用户拒绝,可以结束或回到上一步 */)
        );

        // 步骤4:报告生成完成,流程结束
        During(AwaitingReportGeneration,
            When(ReportGenerated)
                .Then(context => {
                    // 最终结果已生成,可以更新WorkSession的FinalResult
                })
                .Finalize()
        );
    }
}

这个状态机清晰地定义了整个工作流的步骤、状态转换和触发条件。进程管理器持有工作流的状态( ReportGenerationSagaState ),并监听相关事件,在适当的时候发出新的命令来驱动流程向前。

6.2 进程管理器的持久化与容错

MassTransit会自动将Saga的状态持久化到配置的存储中(如Redis, PostgreSQL, MongoDB)。这意味着即使服务重启,未完成的工作流也能从上次中断的状态恢复。这是构建可靠的长时运行AI工作流的关键。

注意事项 :在设计Saga时,要特别注意 补偿事务 。如果工作流中的某一步失败(例如AI调用超时),你需要有回滚或补偿机制。例如,在“生成报告”失败后,你可能需要发布一个 CleanupTempDataCommand 。这通常通过监听失败事件或设置超时器来实现。

7. 部署、监控与性能考量

将这样一个基于CQRS和事件驱动的系统部署到生产环境,需要额外的考虑。

7.1 部署拓扑

建议将不同的组件部署为独立的微服务或至少是独立的进程,以实现独立伸缩:

  • API网关服务 :处理HTTP请求,发送命令和查询。
  • 命令处理服务 :运行MediatR命令处理器和领域逻辑。
  • 事件处理服务 :运行MassTransit消费者,处理AI调用和业务逻辑。
  • 查询服务 :专门处理查询请求,连接只读数据库副本。
  • 进程管理器服务 :运行Saga状态机实例。

这些服务通过消息总线(RabbitMQ/Kafka)和数据库进行通信。数据库层面,需要设置主从复制,将写操作指向主库,读操作指向从库。

7.2 可观测性

分布式系统的调试离不开强大的可观测性。

  • 日志 :结构化日志(如Serilog + Seq/ELK)。在每个命令、事件、AI调用的边界记录日志,并包含唯一的 CorrelationId (通常是 WorkSessionId ),以便追踪整个工作流。
  • 指标 :使用Prometheus和Grafana监控关键指标:命令/查询的吞吐量与延迟、AI API的调用次数与延迟、消息队列的积压情况、各服务的内存/CPU使用率。
  • 分布式追踪 :使用OpenTelemetry将跨服务的调用链串联起来,可视化一个用户请求从API网关到命令处理,再到AI调用和事件处理的完整路径。

7.3 性能与伸缩性要点

  1. 命令端的异步非阻塞 :确保所有I/O操作(数据库、AI调用、消息发布)都是异步的,避免阻塞线程池线程。
  2. 查询端的缓存策略 :针对不同数据特点采用多级缓存。会话元信息(如状态、进度)可以缓存在Redis中并设置较短TTL。历史对话记录可以分页缓存。静态的、通用的AI回答可以缓存更长时间。
  3. AI调用优化
    • 批处理 :如果可能,将多个小的、独立的Prompt合并成一个批处理请求发送给AI API,以减少网络往返和利用Token效率。
    • 流式响应 :对于生成长文本的场景,使用OpenAI的流式响应(streaming),并将内容通过SignalR分块推送给前端,提升用户体验。
    • 速率限制与退避 :严格遵守AI服务的速率限制,实现带指数退避的智能重试机制。
  4. 消息总线配置 :根据事件类型设置不同的队列和交换器。高优先级的事件(如用户实时交互)使用独立队列,确保低延迟。批量处理的事件可以使用工作队列模式。

8. 常见问题与排查技巧实录

在实际开发和运维中,我遇到了不少典型问题,这里分享一些排查思路和解决方案。

问题现象 可能原因 排查步骤与解决方案
用户发送命令后,长时间无响应,查询状态一直是“Running”。 1. 命令事件未发布。
2. 事件处理器消费失败。
3. AI服务调用超时或失败。
4. 进程管理器状态卡住。
1. 检查日志 :查看命令处理器日志,确认 AnalysisStartedEvent 是否成功发布。使用消息队列的管理界面查看对应队列是否有消息堆积。
2. 追踪事件 :通过 CorrelationId 在分布式追踪系统中查看事件流在哪里中断。
3. 检查AI服务 :查看AI服务调用的日志和指标,是否有429(限速)或5xx错误。检查Prompt是否构造正确,Token是否超限。
4. 检查Saga状态 :查询Saga状态存储(如数据库中的 ReportGenerationSagaState 表),看当前状态是否与预期一致。
查询端返回的数据不是最新的。 1. 数据库主从同步延迟。
2. 查询缓存未及时失效。
1. 监控复制延迟 :监控数据库的复制延迟指标。对于一致性要求极高的查询,可以考虑“写后读”模式,即命令处理完成后,将关键数据写入一个快速缓存(如Redis),查询端优先读缓存。
2. 精细化缓存失效 :在命令处理器中,当聚合根状态改变时,除了发布事件,还应主动使相关缓存失效。例如,在 WorkSession 状态更新后,立即删除Redis中该会话的进度缓存键。
AI返回的响应格式不符合预期,导致后续流程解析失败。 Prompt指令不清晰,AI输出不稳定。 1. 强化Prompt工程 :在Prompt中使用更明确的指令,例如“请严格按照以下JSON格式输出:...”。使用OpenAI的 response_format 参数强制JSON输出。
2. 增加响应验证与重试 :在 IAIService 中,对AI返回的内容进行强验证。如果解析失败,可以尝试用另一个更严格的Prompt让AI修正输出,或者记录错误并转入人工处理流程。
进程管理器进入无法跳出的状态(死锁)。 Saga状态机设计有缺陷,某个预期事件永远无法发生。 1. 设计时加入超时处理 :为每个等待状态(如 AwaitingUserConfirmation )设置超时事件。超时后,可以发布一个补偿命令,并将Saga状态置为“超时失败”,通知用户。
2. 添加管理控制台 :构建一个内部管理界面,可以查看所有运行中的Saga实例及其状态,并允许管理员手动干预(如强制发布某个事件或重置状态)。
在高并发下,消息队列出现大量积压。 事件处理器的处理速度跟不上命令的生成速度,尤其是AI调用成为瓶颈。 1. 横向扩展事件处理器 :增加事件处理服务(消费者)的实例数量。
2. 优化AI调用 :如前所述,考虑批处理、使用更快的模型(如 gpt-3.5-turbo )、或引入请求队列在服务内部进行限流和调度。
3. 优先级队列 :将实时性要求不高的事件(如日志记录、数据分析)路由到低优先级队列,确保核心业务事件优先处理。

实操心得 :在开发初期,就应投入精力搭建好结构化的日志和分布式追踪。当问题发生时,能够通过一个 SessionId 快速拉取到跨所有服务的相关日志和追踪信息,是快速定位问题的关键。另外,对于AI集成项目,一定要对第三方API的失败有充分的预案,设计降级策略(例如,缓存旧答案、返回友好提示、转入人工队列等)。

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐