FastGPT RAG 处理逻辑和代码分析

RAG 核心架构概览

FastGPT 的 RAG (Retrieval-Augmented Generation) 系统采用多阶段检索增强架构,通过查询扩展、多模式检索、重排序融合等技术,实现高质量的知识检索和生成。

1. RAG 处理流程

整体流程图

用户查询 → 查询扩展 → 多模式检索 → 结果融合 → 重排序 → 上下文注入 → LLM生成
    ↓         ↓         ↓         ↓         ↓         ↓         ↓
  原始问题   扩展问题   向量+全文   RRF融合   ReRank   提示词构建  最终回答

核心入口函数

// 文件: packages/service/core/workflow/dispatch/dataset/search.ts
export async function dispatchDatasetSearch(
  props: DatasetSearchProps
): Promise<DatasetSearchResponse> {
  
  // 1. 参数验证和预处理
  const { datasets, similarity, limit, userChatInput, searchMode } = props.params;
  
  // 2. 权限过滤
  const datasetIds = authTmbId ? 
    await filterDatasetsByTmbId({ datasetIds, tmbId }) : 
    datasets.map(item => item.datasetId);
  
  // 3. 获取向量模型
  const vectorModel = getEmbeddingModel(
    (await MongoDataset.findById(datasets[0].datasetId, 'vectorModel').lean())?.vectorModel
  );
  
  // 4. 构建搜索参数
  const searchData = {
    histories, teamId, reRankQuery: userChatInput,
    queries: [userChatInput], model: vectorModel.model,
    similarity, limit, datasetIds, searchMode,
    usingReRank: usingReRank && (await checkTeamReRankPermission(teamId))
  };
  
  // 5. 执行检索 (深度检索 vs 标准检索)
  const { searchRes, tokens, queryExtensionResult } = datasetDeepSearch ?
    await deepRagSearch({ ...searchData, datasetDeepSearchModel }) :
    await defaultSearchDatasetData({ 
      ...searchData, 
      datasetSearchUsingExtensionQuery,
      datasetSearchExtensionModel 
    });
  
  // 6. 返回结果
  return {
    quoteQA: searchRes,
    nodeResponse: { totalPoints, query: userChatInput, searchRes },
    nodeDispatchUsages,
    toolResponses: searchRes.map(item => ({
      sourceName: item.sourceName,
      content: `${item.q}\n${item.a}`.trim()
    }))
  };
}

2. 查询扩展 (Query Extension)

查询扩展核心逻辑

// 文件: packages/service/core/dataset/search/utils.ts
export const datasetSearchQueryExtension = async ({
  query, extensionModel, extensionBg, histories
}) => {
  // 1. 去重处理
  const filterSamQuery = (queries: string[]) => {
    const set = new Set<string>();
    return queries.filter(item => {
      const str = hashStr(item.replace(/[^\p{L}\p{N}]/gu, ''));
      if (set.has(str)) return false;
      set.add(str);
      return true;
    });
  };
  
  // 2. 检查是否已扩展
  let { queries, rewriteQuery, alreadyExtension } = (() => {
    let rewriteQuery = histories.length > 0 ?
      `${histories.map(item => 
        `${item.obj}: ${chatValue2RuntimePrompt(item.value).text}`
      ).join('\n')}\nHuman: ${query}\n` : query;
    
    // 尝试解析JSON格式的扩展查询
    try {
      const jsonParse = JSON.parse(query);
      const queries = Array.isArray(jsonParse) ? filterSamQuery(jsonParse) : [query];
      return {
        queries,
        rewriteQuery: Array.isArray(jsonParse) ? queries.join('\n') : rewriteQuery,
        alreadyExtension: Array.isArray(jsonParse)
      };
    } catch {
      return { queries: [query], rewriteQuery, alreadyExtension: false };
    }
  })();
  
  // 3. AI 查询扩展
  const aiExtensionResult = await (async () => {
    if (!extensionModel || alreadyExtension) return;
    
    const result = await queryExtension({
      chatBg: extensionBg,
      query,
      histories,
      model: extensionModel.model
    });
    
    return result.extensionQueries?.length > 0 ? result : undefined;
  })();
  
  // 4. 合并扩展查询
  const extensionQueries = filterSamQuery(aiExtensionResult?.extensionQueries || []);
  if (aiExtensionResult) {
    queries = filterSamQuery(queries.concat(extensionQueries));
    rewriteQuery = queries.join('\n');
  }
  
  return { extensionQueries, concatQueries: queries, rewriteQuery, aiExtensionResult };
};

AI 查询扩展实现

// 文件: packages/service/core/ai/functions/queryExtension.ts
export const queryExtension = async ({
  chatBg, query, histories, model
}) => {
  // 1. 构建上下文
  const systemFewShot = chatBg ? `user: 对话背景。\nassistant: ${chatBg}\n` : '';
  
  const modelData = getLLMModel(model);
  const filterHistories = await filterGPTMessageByMaxContext({
    messages: chats2GPTMessages({ messages: histories }),
    maxContext: modelData.maxContext - 1000
  });
  
  // 2. 构建历史对话
  const historyFewShot = filterHistories
    .map(item => {
      if ((item.role === 'user' || item.role === 'assistant') && item.content) {
        return `${item.role}: ${typeof item.content === 'string' ? 
          item.content : 
          item.content.map(c => c.type === 'text' ? c.text : '').join('\n')}`;
      }
    })
    .filter(Boolean)
    .join('\n');
  
  // 3. 使用预设提示词进行扩展
  const messages = [{
    role: 'user',
    content: replaceVariable(defaultPrompt, {
      query: `${query}`,
      histories: `${systemFewShot}${historyFewShot}`.trim() || 'null'
    })
  }];
  
  // 4. 调用 LLM 生成扩展查询
  const { response: result } = await createChatCompletion({
    body: llmCompletionsBodyFormat({
      stream: false,
      model: modelData.model,
      temperature: 0.1,
      messages
    }, modelData)
  });
  
  // 5. 解析 JSON 结果
  let answer = result.choices?.[0]?.message?.content || '';
  const start = answer.indexOf('[');
  const end = answer.lastIndexOf(']');
  
  if (start === -1 || end === -1) return { extensionQueries: [] };
  
  const jsonStr = answer.substring(start, end + 1)
    .replace(/(\n|\\)/g, '')
    .replace(/  /g, '');
  
  try {
    const queries = json5.parse(jsonStr) as string[];
    return {
      rawQuery: query,
      extensionQueries: (Array.isArray(queries) ? queries : []).slice(0, 5),
      model,
      inputTokens: await countGptMessagesTokens(messages),
      outputTokens: await countPromptTokens(answer)
    };
  } catch (error) {
    return { extensionQueries: [] };
  }
};

查询扩展提示词模板

const defaultPrompt = `## 你的任务
你作为一个向量检索助手,你的任务是结合历史记录,从不同角度,为"原问题"生成个不同版本的"检索词",从而提高向量检索的语义丰富度,提高向量检索的精度。
生成的问题要求指向对象清晰明确,并与"原问题语言相同"。

## 参考示例

历史记录: 
"""
null
"""
原问题: 介绍下剧情。
检索词: ["介绍下故事的背景。","故事的主题是什么?","介绍下故事的主要人物。"]
----------------
历史记录: 
"""
user: 对话背景。
assistant: 当前对话是关于 Nginx 的介绍和使用等。
"""
原问题: 怎么下载
检索词: ["Nginx 如何下载?","下载 Nginx 需要什么条件?","有哪些渠道可以下载 Nginx?"]

## 输出要求
1. 输出格式为 JSON 数组,数组中每个元素为字符串。无需对输出进行任何解释。
2. 输出语言与原问题相同。原问题为中文则输出中文;原问题为英文则输出英文。

## 开始任务
历史记录:
"""
{{histories}}
"""
原问题: {{query}}
检索词: `;

3. 多模式检索

检索模式配置

// 文件: packages/service/core/dataset/search/controller.ts
const countRecallLimit = () => {
  if (searchMode === DatasetSearchModeEnum.embedding) {
    return { embeddingLimit: 100, fullTextLimit: 0 };
  }
  if (searchMode === DatasetSearchModeEnum.fullTextRecall) {
    return { embeddingLimit: 0, fullTextLimit: 100 };
  }
  // 混合模式
  return { embeddingLimit: 80, fullTextLimit: 60 };
};

向量检索实现

const embeddingRecall = async ({ query, limit }) => {
  // 1. 文本向量化
  const { vectors, tokens } = await getVectorsByText({
    model: getEmbeddingModel(model),
    input: query,
    type: 'query'
  });
  
  // 2. 向量数据库检索
  const { results } = await recallFromVectorStore({
    teamId, datasetIds,
    vector: vectors[0],
    limit,
    forbidCollectionIdList,
    filterCollectionIdList
  });
  
  // 3. 关联数据查询
  const [dataList, collections] = await Promise.all([
    MongoDatasetData.find({
      teamId,
      datasetId: { $in: datasetIds },
      'indexes.dataId': { $in: results.map(item => item.id) }
    }),
    MongoDatasetCollection.find({
      _id: { $in: collectionIdList }
    })
  ]);
  
  // 4. 格式化结果
  return {
    embeddingRecallResults: results.map((item, index) => ({
      id: String(data._id),
      q: data.q, a: data.a,
      score: [{ type: SearchScoreTypeEnum.embedding, value: item.score, index }],
      ...getCollectionSourceData(collection)
    })),
    tokens
  };
};

全文检索实现

const fullTextRecall = async ({ query, limit }) => {
  // MongoDB 全文索引检索
  const searchResults = await MongoDatasetDataText.aggregate([
    {
      $match: {
        teamId: new Types.ObjectId(teamId),
        datasetId: { $in: datasetIds.map(id => new Types.ObjectId(id)) },
        $text: { $search: jiebaSplit({ text: query }) }  // 中文分词
      }
    },
    { $sort: { score: { $meta: 'textScore' } } },
    { $limit: limit }
  ]);
  
  return {
    fullTextRecallResults: searchResults.map((item, index) => ({
      id: String(data._id),
      q: data.q, a: data.a,
      score: [{ type: SearchScoreTypeEnum.fullText, value: item.score, index }],
      ...getCollectionSourceData(collection)
    }))
  };
};

4. RRF 融合算法

RRF (Reciprocal Rank Fusion) 实现

// 文件: packages/global/core/dataset/search/utils.ts
export const datasetSearchResultConcat = (
  arr: { k: number; list: SearchDataResponseItemType[] }[]
): SearchDataResponseItemType[] => {
  
  const map = new Map<string, SearchDataResponseItemType & { rrfScore: number }>();
  
  // RRF 算法核心
  arr.forEach(({ k, list }) => {
    list.forEach((data, index) => {
      const rank = index + 1;
      const score = 1 / (k + rank);  // RRF 公式
      
      const record = map.get(data.id);
      if (record) {
        // 合并分数,相同类型取最大值
        const concatScore = [...record.score];
        for (const dataItem of data.score) {
          const sameScore = concatScore.find(item => item.type === dataItem.type);
          if (sameScore) {
            sameScore.value = Math.max(sameScore.value, dataItem.value);
          } else {
            concatScore.push(dataItem);
          }
        }
        
        map.set(data.id, {
          ...record,
          score: concatScore,
          rrfScore: record.rrfScore + score
        });
      } else {
        map.set(data.id, { ...data, rrfScore: score });
      }
    });
  });
  
  // 按 RRF 分数排序
  return Array.from(map.values())
    .sort((a, b) => b.rrfScore - a.rrfScore)
    .map((item, index) => {
      // 添加 RRF 分数到结果中
      item.score.push({
        type: SearchScoreTypeEnum.rrf,
        value: item.rrfScore,
        index
      });
      delete item.rrfScore;
      return item;
    });
};

多路检索融合

// 文件: packages/service/core/dataset/search/controller.ts
const multiQueryRecall = async ({ embeddingLimit, fullTextLimit }) => {
  const embeddingRecallResList: SearchDataResponseItemType[][] = [];
  const fullTextRecallResList: SearchDataResponseItemType[][] = [];
  
  // 并行执行多查询检索
  await Promise.all(
    queries.map(async (query) => {
      const [{ embeddingRecallResults }, { fullTextRecallResults }] = 
        await Promise.all([
          embeddingRecall({ query, limit: embeddingLimit }),
          fullTextRecall({ query, limit: fullTextLimit })
        ]);
      
      embeddingRecallResList.push(embeddingRecallResults);
      fullTextRecallResList.push(fullTextRecallResults);
    })
  );
  
  // RRF 融合多查询结果
  const rrfEmbRecall = datasetSearchResultConcat(
    embeddingRecallResList.map(list => ({ k: 60, list }))
  ).slice(0, embeddingLimit);
  
  const rrfFTRecall = datasetSearchResultConcat(
    fullTextRecallResList.map(list => ({ k: 60, list }))
  ).slice(0, fullTextLimit);
  
  return { embeddingRecallResults: rrfEmbRecall, fullTextRecallResults: rrfFTRecall };
};

5. 重排序 (ReRank)

ReRank 处理

// 文件: packages/service/core/dataset/search/controller.ts
const reRankResults = await (async () => {
  if (!usingReRank) return [];
  
  // 合并向量和全文检索结果
  const concatRecallResults = embeddingRecallResults.concat(
    fullTextRecallResults.filter(item => !embeddingSet.has(item.id))
  );
  
  // 去重处理
  const filterSameDataResults = concatRecallResults.filter(item => {
    const str = hashStr(`${item.q}${item.a}`.replace(/[^\p{L}\p{N}]/gu, ''));
    if (set.has(str)) return false;
    set.add(str);
    return true;
  });
  
  // 调用 ReRank 模型
  return await datasetDataReRank({
    query: reRankQuery,
    data: filterSameDataResults
  });
})();

// 最终 RRF 融合
const rrfConcatResults = datasetSearchResultConcat([
  { k: 60, list: embeddingRecallResults },
  { k: 60, list: fullTextRecallResults },
  { k: 58, list: reRankResults }
]);

6. 相似度过滤和 Token 限制

相似度过滤

const scoreFilter = (() => {
  if (usingReRank) {
    return filterSameDataResults.filter(item => {
      const reRankScore = item.score.find(s => s.type === SearchScoreTypeEnum.reRank);
      return !reRankScore || reRankScore.value >= similarity;
    });
  }
  
  if (searchMode === DatasetSearchModeEnum.embedding) {
    return filterSameDataResults.filter(item => {
      const embeddingScore = item.score.find(s => s.type === SearchScoreTypeEnum.embedding);
      return !embeddingScore || embeddingScore.value >= similarity;
    });
  }
  
  return filterSameDataResults;
})();

Token 限制处理

export const filterDatasetDataByMaxTokens = async (
  data: SearchDataResponseItemType[],
  maxTokens: number
) => {
  // 计算每个结果的 Token 数
  const tokensScoreFilter = await Promise.all(
    data.map(async (item) => ({
      ...item,
      tokens: await countPromptTokens(item.q + item.a)
    }))
  );
  
  const results: SearchDataResponseItemType[] = [];
  let totalTokens = 0;
  
  // 累计 Token 直到达到限制
  for (const item of tokensScoreFilter) {
    totalTokens += item.tokens;
    
    if (totalTokens > maxTokens + 500) break;
    
    results.push(item);
    
    if (totalTokens > maxTokens) break;
  }
  
  return results.length === 0 ? data.slice(0, 1) : results;
};

7. 上下文注入

知识库引用处理

// 文件: packages/service/core/workflow/dispatch/chat/oneapi.ts
const filterDatasetQuote = async ({ quoteQA, model, quoteTemplate }) => {
  // 根据模板格式化引用内容
  function getValue(item: SearchDataResponseItemType, index: number) {
    return replaceVariable(quoteTemplate || Prompt_QuoteTemplateList[0].value, {
      id: item.id,
      q: item.q,
      a: item.a,
      updateTime: formatTime2YMDHM(item.updateTime),
      source: item.sourceName,
      sourceId: String(item.sourceId || ''),
      index: index + 1
    });
  }
  
  // 按 Token 限制过滤
  const filterQuoteQA = await filterSearchResultsByMaxChars(quoteQA, model.quoteMaxToken);
  
  const datasetQuoteText = filterQuoteQA.length > 0 ?
    `${filterQuoteQA.map((item, index) => getValue(item, index).trim()).join('\n------\n')}` :
    '';
  
  return { datasetQuoteText };
};

系统提示词构建

const getChatMessages = async ({ datasetQuoteText, systemPrompt, userChatInput }) => {
  // 构建完整的系统提示词
  const concatenateSystemPrompt = [
    model.defaultSystemChatPrompt,
    systemPrompt,
    datasetQuoteText ? replaceVariable(datasetQuotePromptTemplate, {
      quote: datasetQuoteText
    }) : ''
  ].filter(Boolean).join('\n\n===---===---===\n\n');
  
  const messages: ChatItemType[] = [
    ...getSystemPrompt_ChatItemType(concatenateSystemPrompt),
    ...histories,
    {
      obj: ChatRoleEnum.Human,
      value: runtimePrompt2ChatsValue({ text: userChatInput })
    }
  ];
  
  return { filterMessages: await filterGPTMessageByMaxContext({ messages }) };
};

8. 深度 RAG 搜索

Deep RAG 实现

// 支持迭代式深度搜索
export const deepRagSearch = async ({
  datasetDeepSearchModel,
  datasetDeepSearchMaxTimes = 3,
  datasetDeepSearchBg,
  ...searchData
}) => {
  let currentQuery = searchData.reRankQuery;
  let allResults: SearchDataResponseItemType[] = [];
  let iterations = 0;
  
  while (iterations < datasetDeepSearchMaxTimes) {
    // 执行搜索
    const result = await defaultSearchDatasetData({
      ...searchData,
      queries: [currentQuery]
    });
    
    // 合并结果
    allResults = mergeSearchResults(allResults, result.searchRes);
    
    // 判断是否需要继续搜索
    if (shouldStopSearch(result.searchRes, iterations)) break;
    
    // 生成下一轮查询
    currentQuery = await generateNextQuery({
      originalQuery: searchData.reRankQuery,
      currentResults: result.searchRes,
      model: datasetDeepSearchModel,
      background: datasetDeepSearchBg
    });
    
    iterations++;
  }
  
  return { searchRes: allResults, deepSearchResult: { iterations } };
};

FastGPT 的 RAG 系统实现了多层次、智能化的检索增强生成

核心特性

  1. 查询扩展: AI 驱动的多角度查询生成
  2. 多模式检索: 向量检索 + 全文检索 + 混合模式
  3. RRF 融合: 科学的多路检索结果融合算法
  4. 重排序: 可选的 ReRank 模型提升相关性
  5. 智能过滤: 相似度阈值 + Token 限制
  6. 上下文注入: 结构化的知识引用和提示词构建
  7. 深度搜索: 迭代式多轮检索优化

技术优势

  • 高召回率: 多查询扩展 + 多模式检索
  • 高精确度: ReRank 重排序 + 相似度过滤
  • 高效融合: RRF 算法科学融合多路结果
  • 智能控制: Token 限制 + 动态截断
  • 可扩展性: 模块化设计支持新检索模式
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐