背景

最近在做大模型的项目,算法部门提供的文档解析接口, 并发度为1, 业务这边需要在ai问答和上传文档时进行解析和向量化,文档解析只能单线程跑,问答的文档解析需要高优先级处理。

采用 rocketmq 做文档上传和解析的解耦(项目背景在,无法用其它 mq 替换)。

由于rocketmq不支持优先级队列,需要自己实现优先级队列的效果。

基本思路

创建两个 topic 和两个 group, 分别对应高优先级任务(HighPriorityTopic)和低优先级任务(LowPriorityTopic)。
采用 pull 模式,手动拉取消息,如果 HighPriorityTopic 拉取为空,再去 LowPriorityTopic 拉取消息。

结果

在两个消息队列都为空的时候,上传文档,解析任务会延迟大概5min才消费。
原因是 dev环境文档解析大概需要2min, 而为了避免消息重复消费,我把 mq 的 invisibleDuration 改到了 5min。
当消息错过这个时间窗口的时候,只能等到下个时间窗口才能消费。

解决办法

高优先级队列 push 模式, 低优先级队列 pull 模式。采用 AtomicInteger 做状态判断。
测试结果mq 消息消费几乎没延迟。

代码

private void consumeMessage(ClientConfiguration clientConfiguration) throws ClientException, InterruptedException {
        // 创建并初始化消费者
        FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);

        // 高优先级消费 push 模式
        clientServiceProvider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            .setConsumerGroup(docParseHighPriorityGroup)
            .setSubscriptionExpressions(Map.of(docParseHighPriorityTopic, filterExpression))
            .setMessageListener(messageView -> {
                highPriorityTaskCount.incrementAndGet();
                try {
                    log.info("[文档解析] 消费高优先级解析任务:topic = {}, messageId = {}", messageView.getTopic(), messageView.getMessageId());
                    this.dealMessage(messageView);
                    return ConsumeResult.SUCCESS;
                } finally {
                    highPriorityTaskCount.decrementAndGet();
                }
            })
            .build();

        // 低优先级消费 pull 模式
        SimpleConsumer lowPriorityConsumer = clientServiceProvider.newSimpleConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            .setConsumerGroup(docParseGroup)
            .setAwaitDuration(Duration.ofSeconds(10))
            .setSubscriptionExpressions(Map.of(docParseTaskTopic, filterExpression))
            .build();

        int maxMessageNum = 1;
        Duration invisibleDuration = Duration.ofMinutes(5);

        while (true) {
            if (highPriorityTaskCount.get() == 0) { // 高优先级没消息了才开始消费
                List<MessageView> receive = lowPriorityConsumer.receive(maxMessageNum, invisibleDuration);
                log.info("[文档解析] 消费低优先级解析任务,收到消息数量:{}", receive.size());
                this.handleMessages(lowPriorityConsumer, receive);
            } else {
                Thread.sleep(1_000);
            }
        }
    }
Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐