适用于LLM初学者和有一定应用经验的朋友。学习调用常规的大语言模型,并进行标注工作。

1 前言

随着大模型应用的普及,一些企业开始尝试将各种场景与大模型进行融合。但是在结合的过程中,当前阶段下,LLM技术依然有些难题无法解决。

1. 知识管理问题

  • 知识来源的权威性
  • 知识更新的及时性
  • 效果保障的责任归属

2. 成本控制问题

  • 高并发场景下的计算成本
  • 投入产出比的平衡

因此,在特定场景下(尤其是高频客服场景),将LLM作为辅助工具来提升传统模型的效果,可能是过渡阶段的最优解决方案。

应用示例:客服机器人优化

以传统客服机器人为例:

  • 现有系统:基于人工配置的QA知识库,使用BERT等预训练模型进行相似度匹配
  • 核心流程:用户提问 → BERT模型匹配相似QA → 按规则返回答案
  • 效果决定因素:BERT模型训练语料的质量

在完全迁移到LLM方案之前,可以利用LLM的强大能力来优化现有模型的训练数据,从而提升整体服务质量,这不失为一个经济且实用的过渡方案。

本文旨在探讨如何利用大语言模型的能力来标注和优化训练数据,从而提升传统NLP模型的效果。文章将通过实际案例,为LLM初学者和有经验的开发者提供一个实用的应用思路。

2 语料生成

在传统的客服机器人场景中,高质量的训练语料对模型效果至关重要。然而,人工编写耗时耗力,而且难以覆盖用户的多样化表达方式。通过利用LLM的自然语言理解和生成能力,可以快速扩充和丰富现有的训练语料库。

实现思路

1. 基础语料准备

  • 收集现有的核心问题集(标准问题)
  • 确保这些问题覆盖主要业务场景
  • 对问题进行分类和整理

2. 提示词设计

  • 明确指示LLM扮演用户角色
  • 要求生成的问法要保持语义一致性
  • 设定合理的变体数量(如每个标准问题生成50个变体)
  • 控制生成的多样性(通过temperature参数)

3. 批量处理流程

  • 读取标准问题集
  • 调用LLM API进行批量生成
  • 结果存储和格式化处理

举例

# 示例提示词模板
prompt_template = """
你擅长语文,擅长站在客户的角度思考
请你根据如下问题,生成50个用户可能提问的不同的问题。
要求:
1. 保持原问题的核心语义不变
2. 模拟真实用户的各种表达方式
3. 考虑不同场景下的表述变化
4. 包含口语化和书面语表达

原问题:{question}
"""

这里其实主要依靠提示词,实践的时候有几个注意点:

(1)格式

大模型生成的格式不一定是我们想要的,可以先观察生成的格式,再处理为需要的。比如大模型直接生成:

  1. 语料A
  2. 语料B
  3. 语料C

会有1、2、3着用的标签,直接存到excel不方便,所以观察到格式后,可以清理下(在下面代码的清理数字编号部分)

(2)效果

让大模型直接生成30已经接近极限,因为大模型很容易输出很多相似的,效果不佳(我们需要的是同样意思但是更加丰富的表达)。这里可以利用提示词,多描述一些场景和例子,或者让大模型一步一步代入用户视角思考每个环节会遇到哪些问题,再生成语料(这部分靠提示词,在本篇不再做赘述)

实践代码如下:

import os
from dashscope import Generation
import dashscope
import pandas as pd

def get_response(messages):
    response = Generation.call(
        api_key='sk-xxxxxxxxxxxxxxxxxxxxxxx',  # 请替换为您的API key
        model="qwen-max",   # 能用最好的就用最好的
        messages=messages,
        result_format="message",
    )
    return response

# 读取Excel文件
df = pd.read_excel('Q补充语料.xlsx', sheet_name='Sheet1', engine='openpyxl')
print("原始数据预览:")
print(df.head())

# 创建结果DataFrame
final_df = pd.DataFrame(columns=['Q', '生成语料'])

# 遍历df中的每一行
for index, row in df.iterrows():
    # 构建消息列表
    messages = [
        {
            "role": "system",
            "content": "你擅长语文,擅长站在客户的角度思考"
        },
        {
            "role": "user",
            "content": f"""
            请你根据如下问题,生成30个用户可能提问的不同的问题。
            要求:
            1. 保持原问题的核心语义不变
            2. 模拟真实用户的各种表达方式
            3. 考虑不同场景下的表述变化
            4. 包含口语化和书面语表达
            每个问题请用数字编号,每行一个问题。


            问题:{row['新增Q']}
            """
        }
    ]
    
    try:
        # 调用API
        response = get_response(messages)
        generated_text = response.output.choices[0].message.content
        
        # 处理返回的问题列表
        questions = [q.strip() for q in generated_text.split('\n') if q.strip()]
        # 清理数字编号
        cleaned_questions = []
        for q in questions:
            # 移除数字编号和点号
            parts = q.split('.')
            if len(parts) > 1:
                cleaned_questions.append(parts[1].strip())
            else:
                cleaned_questions.append(q.strip())
        
        # 创建临时DataFrame
        temp_df = pd.DataFrame({
            'Q': [row['新增Q']] * len(cleaned_questions),
            '生成语料': cleaned_questions
        })
        
        # 将临时DataFrame添加到最终结果中
        final_df = pd.concat([final_df, temp_df], ignore_index=True)
        
        # 打印进度
        print(f"已处理第 {index + 1} 行,共 {len(df)} 行")
        
    except Exception as e:
        print(f"处理第 {index + 1} 行时出错: {str(e)}")
        continue
    
    # 可选:添加延时以避免API限制
    # time.sleep(1)

# 保存结果
output_filename = '生成语料结果2.xlsx'
final_df.to_excel(output_filename, index=False)
print(f"\n结果已保存到文件:{output_filename}")

# 显示结果预览
print("\n生成结果预览:")
print(final_df.head())

看下效果:

效果还行,要更好的效果就需要精心调整下提示词了。

3 语料清洗

在客服场景中,用户的输入往往包含大量噪音数据,如无意义字符、情绪化表达、非业务相关内容等。高质量的训练数据需要经过严格的清洗过程,以确保模型能够学习到有效的语义信息。通过LLM的强大理解能力,我们可以更智能地完成语料清洗工作。

实现思路

1. 清洗规则定义

  • 明确有效语料的标准
  • 识别无效语料的特征

2. LLM辅助判断

  • 设计清晰的评估提示词
  • 利用LLM进行语义理解
  • 输出标准化的判断结果

3. 批量处理流程

  • 读取原始语料数据
  • 调用LLM API进行评估
  • 保存清洗结果

例子

# 示例提示词模板
prompt_template = """
请你为以下语料进行分类:
1. 正常语料:用户的正常问题,应该被解答
2. 无效语料:用户的问题无法被解答,或者与火车票无关,具体如下:
   - 无意义的字符组合
   - 重复性内容
   - 情绪化或攻击性语言
   - 非相关内容
   - 语言不通顺
   - 语言混杂
   - 缺乏上下文
   - 模糊的情绪表达

输入语料: {text}
请只输出0(无效)或1(有效),不要输出其他内容。
"""

这里其实主要也依靠提示词,实践的时候有个注意点:

中断

由于要清洗的量可能很大,可能会因为奇怪的原因中断,所有写好处理,保存中间处理结果

import pandas as pd
import time
from dashscope import Generation

def get_response(messages):
    """调用Qwen API获取响应"""
    try:
        response = Generation.call(
            api_key='sk-xxxxxxxxxxxx',  # 请替换为您的API key
            model="qwen-max",
            messages=messages,
            result_format="message",
        )
        return response
    except Exception as e:
        print(f"API调用出错: {e}")
        return None

def create_prompt(Q):
    """创建提示词"""
    return [
        {
            "role": "system",
            "content": "你是一位专业的数据标注员"
        },
        {
            "role": "user",
            "content": f"""
            请你为以下语料进行分类。
            1 正常语料:用户的正常问题,应该被解答
            2 无效语料:用户的问题无法被解答,或者与火车票无关,具体如下:
              (1)无意义的字符组合:包括纯符号、纯字母、符号和字母的随机组合,或无意义的数字组合。
              (2)重复性内容,连续重复字数大于3个字,且重复部分占据整句内容的50%以上。
              (3)情绪化或攻击性语言
              (4)语句中抱怨、吐槽、辱骂等字眼占大部分,且无法判断用户具体意图或需求。
              (5)非相关内容:内容与火车票服务无关,涉及其他领域或无关话题。
              (6)语言不通顺:语法错误严重,导致无法理解用户意图。
              (7)语言混杂,使用多种语言或方言,导致理解困难。
              (8)缺乏上下文:句子孤立存在,缺乏上下文信息,无法判断用户意图或需求。
              (9)模糊的情绪:无法通过现有信息感知对话内容的情绪。

            注意,这是一个客服场景。
            
            用户语料: {Q}
            
            请只输出0(无效)或1(有效),不要输出其他内容。
            """
        }
    ]

def process_dataframe(df):
    """处理数据框,添加标记并进行分类"""
    # 添加标记列
    df['label'] = None
    
    # 根据字符长度标记
    df.loc[df['userinput'].str.len() < 3, 'label'] = 0
    df.loc[df['userinput'].str.len() > 30, 'label'] = 0
    
    # 获取未标记的行
    unlabeled_rows = df[df['label'].isna()]
    
    # 计数器和时间追踪
    counter = 0
    batch_counter = 0
    start_time = time.time()
    batch_start_time = start_time
    
    # 处理每一行未标记的数据
    for index, row in unlabeled_rows.iterrows():
        try:
            # 创建messages并调用API
            messages = create_prompt(row['userinput'])
            response = get_response(messages)
            
            if response and response.output:
                # 提取结果(0或1)
                result = response.output.choices[0].message.content.strip()
                if result in ['0', '1']:
                    df.loc[index, 'label'] = int(result)
                else:
                    df.loc[index, 'label'] = None
                
                counter += 1
                batch_counter += 1
                
                # 每50次保存一次结果
                if counter % 50 == 0:
                    current_time = time.time()
                    batch_elapsed_time = current_time - batch_start_time
                    minutes = int(batch_elapsed_time // 60)
                    seconds = int(batch_elapsed_time % 60)

                    print(f"已处理 {counter} 条数据")
                    print(f"本批次50条用时:{minutes}分{seconds}秒")
                    print(f"平均每条数据用时:{batch_elapsed_time/batch_counter:.2f}秒")
                    df.to_csv('intermediate_results.csv', index=False)

                    # 重置批次计数器和计时器
                    batch_counter = 0
                    batch_start_time = current_time
                
                # 添加短暂延迟以避免API限制
                time.sleep(0.1)
                
        except Exception as e:
            print(f"处理行 {index} 时出错: {e}")
            continue
    
    return df

def main(start_row=0):
    """主函数"""
    print("开始读取数据...")
    
    df = pd.read_csv('待清洗的语料.csv')
    total_rows = len(df)
    print(f"数据集总行数: {total_rows}")
    
    if start_row > 0:
        if start_row >= total_rows:
            print(f"错误: 起始行 {start_row} 超出了数据集总行数 {total_rows}")
            return
        
        print(f"跳过前 {start_row} 行数据")
        df = df.iloc[start_row:].reset_index(drop=True)

    print("开始处理数据...")
    results_df = process_dataframe(df)
    
    results_df.to_csv('processed_results.csv', index=False)
    print("数据处理完成!")

if __name__ == "__main__":
    main(start_row=59000)    # 这里可以控制从那里开始

4 相似语料去重

在训练数据中,经常会出现过于重复或相似的语料,如果量过大,不仅会影响模型训练效果,还可能导致资源浪费。通过LLM的语义理解能力结合向量相似度计算,我们可以更准确地识别和处理这些相似语料(可以先计算相似度,再让大模型或人工质检第二遍)

这部分写的相对粗糙一些,核心的思想是计算一个Q下所有的语料,然后全部执行嵌入,再计算相似度,利用阈值(如similarity_threshold=0.90)来控制相似的程度,得到相似组。

注意点:

1. 相似度阈值选择

  • 不同场景可能需要不同的阈值
  • 建议先小规模测试不同阈值的效果

2. 分组处理

  • 记得Q值分组处理,避免不同场景的语料互相干扰

import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import time
import logging
from openai import OpenAI
import os
from tqdm import tqdm

# 设置日志
logging.basicConfig(
    filename='similarity_check.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def get_embedding(text, retry_count=3):
    """获取文本的嵌入向量,包含重试机制"""
    for attempt in range(retry_count):
        try:
            client = OpenAI(
                api_key= 'sk-aaxxxxxxxxxxxxxxxxxxxxxxx',
                base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
            )
            
            response = client.embeddings.create(
                model="text-embedding-v3",
                input=text,
                dimensions=1024,
                encoding_format="float"
            )
            
            return response.data[0].embedding
        except Exception as e:
            if attempt == retry_count - 1:
                logging.error(f"获取嵌入向量失败: {text[:50]}... 错误: {str(e)}")
                return None
            time.sleep(2 ** attempt)  # 指数退避

def process_embeddings(df, batch_size=100):
    """批量处理嵌入向量"""
    if 'embedding' not in df.columns:
        df['embedding'] = None
    
    # 获取需要处理的行
    mask = df['embedding'].isna()
    rows_to_process = df[mask]
    
    if len(rows_to_process) > 0:
        logging.info(f"需要处理 {len(rows_to_process)} 条数据的嵌入向量")
        
        for i in tqdm(range(0, len(rows_to_process), batch_size)):
            batch = rows_to_process.iloc[i:i+batch_size]
            for idx, row in batch.iterrows():
                embedding = get_embedding(row['text'])  # 假设文本列名为'text'
                if embedding is not None:
                    df.at[idx, 'embedding'] = embedding
            
            # 定期保存
            if i % (batch_size * 5) == 0:
                df.to_csv('embeddings_checkpoint.csv', index=False)
                
    return df

def mark_similar_entries(df, similarity_threshold=0.95, batch_size=1000):
    """优化的相似度检查函数"""
    print(f"开始标记相似条目,数据共有{len(df)}条")
    
    # 初始化标记列
    df['is_duplicate'] = False
    df['group_id'] = None
    df['similarity_score'] = None
    
    # 预处理嵌入向量
    embeddings = []
    valid_indices = []
    
    # 收集有效的嵌入向量
    for idx, row in df.iterrows():
        try:
            # 直接将embedding转换为numpy数组
            emb = np.array(row['embedding'])
            
            # 确保向量维度正确
            if emb.size > 0:  # 检查向量非空
                embeddings.append(emb)
                valid_indices.append(idx)
                
        except Exception as e:
            print(f"处理向量时出错 (索引 {idx}): {str(e)}")
            continue
    
    print(f"成功处理 {len(embeddings)} 个有效向量")
    
    if not embeddings:
        print("警告: 没有找到有效的嵌入向量!")
        return df
    
    embeddings = np.array(embeddings)
    print(f"向量数组形状: {embeddings.shape}")
    
    # 分批处理相似度计算
    for i in tqdm(range(0, len(embeddings), batch_size)):
        batch_end = min(i + batch_size, len(embeddings))
        batch_embeddings = embeddings[i:batch_end]
        batch_indices = valid_indices[i:batch_end]
        
        # 对每个批次中的向量
        for j in range(len(batch_indices)):
            idx_i = batch_indices[j]
            
            # 如果已经被标记为重复,跳过
            if df.at[idx_i, 'is_duplicate']:
                continue
            
            # 确保有后续向量可比较
            remaining_start = i + j + 1
            if remaining_start >= len(embeddings):
                continue
            
            # 获取剩余向量
            remaining_embeddings = embeddings[remaining_start:]
            
            if len(remaining_embeddings) > 0:
                # 计算相似度
                current_vector = batch_embeddings[j].reshape(1, -1)
                similarities = cosine_similarity(current_vector, remaining_embeddings)
                similar_indices = np.where(similarities[0] > similarity_threshold)[0]
                
                if len(similar_indices) > 0:
                    df.at[idx_i, 'group_id'] = idx_i
                    
                    for sim_idx, sim_score in zip(similar_indices, similarities[0][similar_indices]):
                        actual_idx = valid_indices[remaining_start + sim_idx]
                        if not df.at[actual_idx, 'is_duplicate']:
                            df.at[actual_idx, 'is_duplicate'] = True
                            df.at[actual_idx, 'group_id'] = idx_i
                            df.at[actual_idx, 'similarity_score'] = sim_score
                            print(f"发现相似条目: {idx_i} 和 {actual_idx}, 相似度: {sim_score:.4f}")
        
        # 定期保存检查点
        if i % (batch_size * 2) == 0 and i > 0:
            checkpoint_file = f'similarity_checkpoint_{i}.csv'
            df.to_csv(checkpoint_file, index=False, encoding='gbk')
            print(f"保存检查点: {checkpoint_file}")
    
    # 统计结果
    duplicate_count = df['is_duplicate'].sum()
    group_count = len(df[df['group_id'].notna() & ~df['is_duplicate']])
    
    print(f"\n处理完成:")
    print(f"- 重复数据数量: {duplicate_count}")
    print(f"- 重复数据占比: {duplicate_count/len(df)*100:.2f}%")
    print(f"- 相似组数量: {group_count}")
    
    return df



def main():
    try:
        # 读取数据
        df = pd.read_csv('data.csv', encoding='gbk')
        logging.info(f"成功读取数据,共{len(df)}行")
        
        # 处理嵌入向量
        df = process_embeddings(df)
        
        # 标记相似条目
        df = mark_similar_entries(df, similarity_threshold=0.92)
        
        # 保存结果
        output_file = f'similarity_results_{time.strftime("%Y%m%d_%H%M%S")}.csv'
        df.to_csv(output_file, index=False)
        logging.info(f"结果已保存至: {output_file}")
        
    except Exception as e:
        logging.error(f"处理过程中出错: {str(e)}")
        raise

if __name__ == "__main__":
    main()

这里因为使用的示例数据,所以没有重复的值。

如果按照预期,最后的结果:

同group会标记到一组。最后可以在让大模型或人工审查一次。

如需大模型,代码示例如下(在上面相似组里取出,让大模型判断,实操的时候一般人工看一下就行):

def verify_with_llm(similar_group):
    """使用LLM验证相似组的合理性"""
    prompt = f"""
    请判断以下语料是否确实语义相似:
    1. {similar_group[0]}
    2. {similar_group[1]}
    只输出"是"或"否"
    """
    # 调用LLM API进行验证

总结

本文介绍了如何利用大语言模型(LLM)来优化传统NLP系统的训练数据,主要包含以下几个方面:

(1) 语料生成

通过精心设计的提示词,利用LLM生成多样化的用户问法变体,丰富训练语料库。关键是要控制生成数量和质量的平衡,并通过提示词引导LLM从用户视角思考。

(2) 语料清洗

利用LLM的语义理解能力,建立智能化的语料筛选机制。通过明确的评估标准,可以有效识别和过滤无效语料,提升数据质量。

(3)相似语料去重

结合向量相似度计算和LLM验证,构建双重过滤机制,避免训练数据中出现过多重复或近似的表达。这既能保证数据的多样性,也能控制数据规模。

在使用LLM时,建议从小批量数据开始测试,逐步调整参数;重视提示词的设计,这直接影响输出质量

希望本文的实践经验能为正在探索LLM应用的团队提供一些参考和启发。欢迎在实践中不断优化和改进这些方法。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐