FastGPT源码解析 Agent 中检索增强RAG 处理逻辑和代码分析
FastGPT 的 RAG (Retrieval-Augmented Generation) 系统采用架构,通过查询扩展、多模式检索、重排序融合等技术,实现高质量的知识检索和生成。
·
FastGPT RAG 处理逻辑和代码分析
RAG 核心架构概览
FastGPT 的 RAG (Retrieval-Augmented Generation) 系统采用多阶段检索增强架构,通过查询扩展、多模式检索、重排序融合等技术,实现高质量的知识检索和生成。
1. RAG 处理流程
整体流程图
用户查询 → 查询扩展 → 多模式检索 → 结果融合 → 重排序 → 上下文注入 → LLM生成
↓ ↓ ↓ ↓ ↓ ↓ ↓
原始问题 扩展问题 向量+全文 RRF融合 ReRank 提示词构建 最终回答
核心入口函数
// 文件: packages/service/core/workflow/dispatch/dataset/search.ts
export async function dispatchDatasetSearch(
props: DatasetSearchProps
): Promise<DatasetSearchResponse> {
// 1. 参数验证和预处理
const { datasets, similarity, limit, userChatInput, searchMode } = props.params;
// 2. 权限过滤
const datasetIds = authTmbId ?
await filterDatasetsByTmbId({ datasetIds, tmbId }) :
datasets.map(item => item.datasetId);
// 3. 获取向量模型
const vectorModel = getEmbeddingModel(
(await MongoDataset.findById(datasets[0].datasetId, 'vectorModel').lean())?.vectorModel
);
// 4. 构建搜索参数
const searchData = {
histories, teamId, reRankQuery: userChatInput,
queries: [userChatInput], model: vectorModel.model,
similarity, limit, datasetIds, searchMode,
usingReRank: usingReRank && (await checkTeamReRankPermission(teamId))
};
// 5. 执行检索 (深度检索 vs 标准检索)
const { searchRes, tokens, queryExtensionResult } = datasetDeepSearch ?
await deepRagSearch({ ...searchData, datasetDeepSearchModel }) :
await defaultSearchDatasetData({
...searchData,
datasetSearchUsingExtensionQuery,
datasetSearchExtensionModel
});
// 6. 返回结果
return {
quoteQA: searchRes,
nodeResponse: { totalPoints, query: userChatInput, searchRes },
nodeDispatchUsages,
toolResponses: searchRes.map(item => ({
sourceName: item.sourceName,
content: `${item.q}\n${item.a}`.trim()
}))
};
}
2. 查询扩展 (Query Extension)
查询扩展核心逻辑
// 文件: packages/service/core/dataset/search/utils.ts
export const datasetSearchQueryExtension = async ({
query, extensionModel, extensionBg, histories
}) => {
// 1. 去重处理
const filterSamQuery = (queries: string[]) => {
const set = new Set<string>();
return queries.filter(item => {
const str = hashStr(item.replace(/[^\p{L}\p{N}]/gu, ''));
if (set.has(str)) return false;
set.add(str);
return true;
});
};
// 2. 检查是否已扩展
let { queries, rewriteQuery, alreadyExtension } = (() => {
let rewriteQuery = histories.length > 0 ?
`${histories.map(item =>
`${item.obj}: ${chatValue2RuntimePrompt(item.value).text}`
).join('\n')}\nHuman: ${query}\n` : query;
// 尝试解析JSON格式的扩展查询
try {
const jsonParse = JSON.parse(query);
const queries = Array.isArray(jsonParse) ? filterSamQuery(jsonParse) : [query];
return {
queries,
rewriteQuery: Array.isArray(jsonParse) ? queries.join('\n') : rewriteQuery,
alreadyExtension: Array.isArray(jsonParse)
};
} catch {
return { queries: [query], rewriteQuery, alreadyExtension: false };
}
})();
// 3. AI 查询扩展
const aiExtensionResult = await (async () => {
if (!extensionModel || alreadyExtension) return;
const result = await queryExtension({
chatBg: extensionBg,
query,
histories,
model: extensionModel.model
});
return result.extensionQueries?.length > 0 ? result : undefined;
})();
// 4. 合并扩展查询
const extensionQueries = filterSamQuery(aiExtensionResult?.extensionQueries || []);
if (aiExtensionResult) {
queries = filterSamQuery(queries.concat(extensionQueries));
rewriteQuery = queries.join('\n');
}
return { extensionQueries, concatQueries: queries, rewriteQuery, aiExtensionResult };
};
AI 查询扩展实现
// 文件: packages/service/core/ai/functions/queryExtension.ts
export const queryExtension = async ({
chatBg, query, histories, model
}) => {
// 1. 构建上下文
const systemFewShot = chatBg ? `user: 对话背景。\nassistant: ${chatBg}\n` : '';
const modelData = getLLMModel(model);
const filterHistories = await filterGPTMessageByMaxContext({
messages: chats2GPTMessages({ messages: histories }),
maxContext: modelData.maxContext - 1000
});
// 2. 构建历史对话
const historyFewShot = filterHistories
.map(item => {
if ((item.role === 'user' || item.role === 'assistant') && item.content) {
return `${item.role}: ${typeof item.content === 'string' ?
item.content :
item.content.map(c => c.type === 'text' ? c.text : '').join('\n')}`;
}
})
.filter(Boolean)
.join('\n');
// 3. 使用预设提示词进行扩展
const messages = [{
role: 'user',
content: replaceVariable(defaultPrompt, {
query: `${query}`,
histories: `${systemFewShot}${historyFewShot}`.trim() || 'null'
})
}];
// 4. 调用 LLM 生成扩展查询
const { response: result } = await createChatCompletion({
body: llmCompletionsBodyFormat({
stream: false,
model: modelData.model,
temperature: 0.1,
messages
}, modelData)
});
// 5. 解析 JSON 结果
let answer = result.choices?.[0]?.message?.content || '';
const start = answer.indexOf('[');
const end = answer.lastIndexOf(']');
if (start === -1 || end === -1) return { extensionQueries: [] };
const jsonStr = answer.substring(start, end + 1)
.replace(/(\n|\\)/g, '')
.replace(/ /g, '');
try {
const queries = json5.parse(jsonStr) as string[];
return {
rawQuery: query,
extensionQueries: (Array.isArray(queries) ? queries : []).slice(0, 5),
model,
inputTokens: await countGptMessagesTokens(messages),
outputTokens: await countPromptTokens(answer)
};
} catch (error) {
return { extensionQueries: [] };
}
};
查询扩展提示词模板
const defaultPrompt = `## 你的任务
你作为一个向量检索助手,你的任务是结合历史记录,从不同角度,为"原问题"生成个不同版本的"检索词",从而提高向量检索的语义丰富度,提高向量检索的精度。
生成的问题要求指向对象清晰明确,并与"原问题语言相同"。
## 参考示例
历史记录:
"""
null
"""
原问题: 介绍下剧情。
检索词: ["介绍下故事的背景。","故事的主题是什么?","介绍下故事的主要人物。"]
----------------
历史记录:
"""
user: 对话背景。
assistant: 当前对话是关于 Nginx 的介绍和使用等。
"""
原问题: 怎么下载
检索词: ["Nginx 如何下载?","下载 Nginx 需要什么条件?","有哪些渠道可以下载 Nginx?"]
## 输出要求
1. 输出格式为 JSON 数组,数组中每个元素为字符串。无需对输出进行任何解释。
2. 输出语言与原问题相同。原问题为中文则输出中文;原问题为英文则输出英文。
## 开始任务
历史记录:
"""
{{histories}}
"""
原问题: {{query}}
检索词: `;
3. 多模式检索
检索模式配置
// 文件: packages/service/core/dataset/search/controller.ts
const countRecallLimit = () => {
if (searchMode === DatasetSearchModeEnum.embedding) {
return { embeddingLimit: 100, fullTextLimit: 0 };
}
if (searchMode === DatasetSearchModeEnum.fullTextRecall) {
return { embeddingLimit: 0, fullTextLimit: 100 };
}
// 混合模式
return { embeddingLimit: 80, fullTextLimit: 60 };
};
向量检索实现
const embeddingRecall = async ({ query, limit }) => {
// 1. 文本向量化
const { vectors, tokens } = await getVectorsByText({
model: getEmbeddingModel(model),
input: query,
type: 'query'
});
// 2. 向量数据库检索
const { results } = await recallFromVectorStore({
teamId, datasetIds,
vector: vectors[0],
limit,
forbidCollectionIdList,
filterCollectionIdList
});
// 3. 关联数据查询
const [dataList, collections] = await Promise.all([
MongoDatasetData.find({
teamId,
datasetId: { $in: datasetIds },
'indexes.dataId': { $in: results.map(item => item.id) }
}),
MongoDatasetCollection.find({
_id: { $in: collectionIdList }
})
]);
// 4. 格式化结果
return {
embeddingRecallResults: results.map((item, index) => ({
id: String(data._id),
q: data.q, a: data.a,
score: [{ type: SearchScoreTypeEnum.embedding, value: item.score, index }],
...getCollectionSourceData(collection)
})),
tokens
};
};
全文检索实现
const fullTextRecall = async ({ query, limit }) => {
// MongoDB 全文索引检索
const searchResults = await MongoDatasetDataText.aggregate([
{
$match: {
teamId: new Types.ObjectId(teamId),
datasetId: { $in: datasetIds.map(id => new Types.ObjectId(id)) },
$text: { $search: jiebaSplit({ text: query }) } // 中文分词
}
},
{ $sort: { score: { $meta: 'textScore' } } },
{ $limit: limit }
]);
return {
fullTextRecallResults: searchResults.map((item, index) => ({
id: String(data._id),
q: data.q, a: data.a,
score: [{ type: SearchScoreTypeEnum.fullText, value: item.score, index }],
...getCollectionSourceData(collection)
}))
};
};
4. RRF 融合算法
RRF (Reciprocal Rank Fusion) 实现
// 文件: packages/global/core/dataset/search/utils.ts
export const datasetSearchResultConcat = (
arr: { k: number; list: SearchDataResponseItemType[] }[]
): SearchDataResponseItemType[] => {
const map = new Map<string, SearchDataResponseItemType & { rrfScore: number }>();
// RRF 算法核心
arr.forEach(({ k, list }) => {
list.forEach((data, index) => {
const rank = index + 1;
const score = 1 / (k + rank); // RRF 公式
const record = map.get(data.id);
if (record) {
// 合并分数,相同类型取最大值
const concatScore = [...record.score];
for (const dataItem of data.score) {
const sameScore = concatScore.find(item => item.type === dataItem.type);
if (sameScore) {
sameScore.value = Math.max(sameScore.value, dataItem.value);
} else {
concatScore.push(dataItem);
}
}
map.set(data.id, {
...record,
score: concatScore,
rrfScore: record.rrfScore + score
});
} else {
map.set(data.id, { ...data, rrfScore: score });
}
});
});
// 按 RRF 分数排序
return Array.from(map.values())
.sort((a, b) => b.rrfScore - a.rrfScore)
.map((item, index) => {
// 添加 RRF 分数到结果中
item.score.push({
type: SearchScoreTypeEnum.rrf,
value: item.rrfScore,
index
});
delete item.rrfScore;
return item;
});
};
多路检索融合
// 文件: packages/service/core/dataset/search/controller.ts
const multiQueryRecall = async ({ embeddingLimit, fullTextLimit }) => {
const embeddingRecallResList: SearchDataResponseItemType[][] = [];
const fullTextRecallResList: SearchDataResponseItemType[][] = [];
// 并行执行多查询检索
await Promise.all(
queries.map(async (query) => {
const [{ embeddingRecallResults }, { fullTextRecallResults }] =
await Promise.all([
embeddingRecall({ query, limit: embeddingLimit }),
fullTextRecall({ query, limit: fullTextLimit })
]);
embeddingRecallResList.push(embeddingRecallResults);
fullTextRecallResList.push(fullTextRecallResults);
})
);
// RRF 融合多查询结果
const rrfEmbRecall = datasetSearchResultConcat(
embeddingRecallResList.map(list => ({ k: 60, list }))
).slice(0, embeddingLimit);
const rrfFTRecall = datasetSearchResultConcat(
fullTextRecallResList.map(list => ({ k: 60, list }))
).slice(0, fullTextLimit);
return { embeddingRecallResults: rrfEmbRecall, fullTextRecallResults: rrfFTRecall };
};
5. 重排序 (ReRank)
ReRank 处理
// 文件: packages/service/core/dataset/search/controller.ts
const reRankResults = await (async () => {
if (!usingReRank) return [];
// 合并向量和全文检索结果
const concatRecallResults = embeddingRecallResults.concat(
fullTextRecallResults.filter(item => !embeddingSet.has(item.id))
);
// 去重处理
const filterSameDataResults = concatRecallResults.filter(item => {
const str = hashStr(`${item.q}${item.a}`.replace(/[^\p{L}\p{N}]/gu, ''));
if (set.has(str)) return false;
set.add(str);
return true;
});
// 调用 ReRank 模型
return await datasetDataReRank({
query: reRankQuery,
data: filterSameDataResults
});
})();
// 最终 RRF 融合
const rrfConcatResults = datasetSearchResultConcat([
{ k: 60, list: embeddingRecallResults },
{ k: 60, list: fullTextRecallResults },
{ k: 58, list: reRankResults }
]);
6. 相似度过滤和 Token 限制
相似度过滤
const scoreFilter = (() => {
if (usingReRank) {
return filterSameDataResults.filter(item => {
const reRankScore = item.score.find(s => s.type === SearchScoreTypeEnum.reRank);
return !reRankScore || reRankScore.value >= similarity;
});
}
if (searchMode === DatasetSearchModeEnum.embedding) {
return filterSameDataResults.filter(item => {
const embeddingScore = item.score.find(s => s.type === SearchScoreTypeEnum.embedding);
return !embeddingScore || embeddingScore.value >= similarity;
});
}
return filterSameDataResults;
})();
Token 限制处理
export const filterDatasetDataByMaxTokens = async (
data: SearchDataResponseItemType[],
maxTokens: number
) => {
// 计算每个结果的 Token 数
const tokensScoreFilter = await Promise.all(
data.map(async (item) => ({
...item,
tokens: await countPromptTokens(item.q + item.a)
}))
);
const results: SearchDataResponseItemType[] = [];
let totalTokens = 0;
// 累计 Token 直到达到限制
for (const item of tokensScoreFilter) {
totalTokens += item.tokens;
if (totalTokens > maxTokens + 500) break;
results.push(item);
if (totalTokens > maxTokens) break;
}
return results.length === 0 ? data.slice(0, 1) : results;
};
7. 上下文注入
知识库引用处理
// 文件: packages/service/core/workflow/dispatch/chat/oneapi.ts
const filterDatasetQuote = async ({ quoteQA, model, quoteTemplate }) => {
// 根据模板格式化引用内容
function getValue(item: SearchDataResponseItemType, index: number) {
return replaceVariable(quoteTemplate || Prompt_QuoteTemplateList[0].value, {
id: item.id,
q: item.q,
a: item.a,
updateTime: formatTime2YMDHM(item.updateTime),
source: item.sourceName,
sourceId: String(item.sourceId || ''),
index: index + 1
});
}
// 按 Token 限制过滤
const filterQuoteQA = await filterSearchResultsByMaxChars(quoteQA, model.quoteMaxToken);
const datasetQuoteText = filterQuoteQA.length > 0 ?
`${filterQuoteQA.map((item, index) => getValue(item, index).trim()).join('\n------\n')}` :
'';
return { datasetQuoteText };
};
系统提示词构建
const getChatMessages = async ({ datasetQuoteText, systemPrompt, userChatInput }) => {
// 构建完整的系统提示词
const concatenateSystemPrompt = [
model.defaultSystemChatPrompt,
systemPrompt,
datasetQuoteText ? replaceVariable(datasetQuotePromptTemplate, {
quote: datasetQuoteText
}) : ''
].filter(Boolean).join('\n\n===---===---===\n\n');
const messages: ChatItemType[] = [
...getSystemPrompt_ChatItemType(concatenateSystemPrompt),
...histories,
{
obj: ChatRoleEnum.Human,
value: runtimePrompt2ChatsValue({ text: userChatInput })
}
];
return { filterMessages: await filterGPTMessageByMaxContext({ messages }) };
};
8. 深度 RAG 搜索
Deep RAG 实现
// 支持迭代式深度搜索
export const deepRagSearch = async ({
datasetDeepSearchModel,
datasetDeepSearchMaxTimes = 3,
datasetDeepSearchBg,
...searchData
}) => {
let currentQuery = searchData.reRankQuery;
let allResults: SearchDataResponseItemType[] = [];
let iterations = 0;
while (iterations < datasetDeepSearchMaxTimes) {
// 执行搜索
const result = await defaultSearchDatasetData({
...searchData,
queries: [currentQuery]
});
// 合并结果
allResults = mergeSearchResults(allResults, result.searchRes);
// 判断是否需要继续搜索
if (shouldStopSearch(result.searchRes, iterations)) break;
// 生成下一轮查询
currentQuery = await generateNextQuery({
originalQuery: searchData.reRankQuery,
currentResults: result.searchRes,
model: datasetDeepSearchModel,
background: datasetDeepSearchBg
});
iterations++;
}
return { searchRes: allResults, deepSearchResult: { iterations } };
};
FastGPT 的 RAG 系统实现了多层次、智能化的检索增强生成:
核心特性
- 查询扩展: AI 驱动的多角度查询生成
- 多模式检索: 向量检索 + 全文检索 + 混合模式
- RRF 融合: 科学的多路检索结果融合算法
- 重排序: 可选的 ReRank 模型提升相关性
- 智能过滤: 相似度阈值 + Token 限制
- 上下文注入: 结构化的知识引用和提示词构建
- 深度搜索: 迭代式多轮检索优化
技术优势
- 高召回率: 多查询扩展 + 多模式检索
- 高精确度: ReRank 重排序 + 相似度过滤
- 高效融合: RRF 算法科学融合多路结果
- 智能控制: Token 限制 + 动态截断
- 可扩展性: 模块化设计支持新检索模式
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)