简介

        最近要做垂直领域大模型的预训练,准备收集1B tokens的专业领域文本数据。简单介绍一下,fasttext是用于领域分类的小模型,datatrove提供了pepiline处理整个数据清洗的流程,里面做了分布式计算和内存调优,在处理大数据的时候有明显优势。数据清洗的流程是:专业领域关键词过滤——fasttext领域分类过滤。整个流程都在datatrove的pipeline中运行。借此文记录这半个月的实习工作内容。

数据来源与获取

        数据集来源基本都是在Hugging Face,OpendataLab等,需要注意数据获取的合法性和道德问题,确保数据使用符合相关法律法规。

数据预处理

        从上述数据来源获取的数据基本都是jsonl或parquet格式的高质量文件。大神们已经做好基础的数据预处理了,数据文件里面大概会有“id”,“text”或者“content”等字段。

        简单介绍一下,fasttext是一个用于分类的小模型,训练起来比较快,一万条数据大概在半小时训练完成。但是要提前准备一些分类好的数据,例如你要做两分类,则要准备两个几乎等量的数据集:_label1_和_label2_。我最后是保留了5000和10000两个数据量版本的二分类训练,发现5000的效果要好一点。

        至于怎么获取分类数据,就要靠自己分类好了。我是通过脚本调用大模型api,在脚本中编写打分prompt,对原始数据进行打分分类。

关键词过滤

        关键词过滤就根据你所在的垂直领域,编写一个关键词过滤脚本。最好对应fasttext的分类,对专业领域的分类做对应过滤。后续将过滤脚本整合到datatrove的filter中,可以在pipeline中调用。

FastText模型训练

  • 数据准备:前面数据预处理提及的要准备分类数据
  • 文本分词:根据你的语言来设计你自己的分词。!需要特别注意,这里有一个大坑,datatrove在做过滤的时候有专属的分词规则,filter_mode: predict and filter on DOCUMENT, PARAGRAPH or SENTENCE level,你要根据他的规则来做fasttext的分词。我当时是后续才发现,我就延续我自己的jieba分词,然后改写src\datatrove\pipeline\filters\fasttext_filter.py中filter方法。还有一点,修改fasttext_filter.py需要用开发者模型或者import的时候正确导入你的fasttext_filter.py
  • 模型训练过程比较简单,注意数据量的控制就行。最好使用1:1的数据量。我自己的总结少量数据对于高分数据的召回率比较友好,比较宽松,过滤出来的数据量比较多。随着数据量的增多,会提升对低分数据的过滤率,比较严格,过滤出来的数据质量比较好。

Datatrove框架应用

  • Datatrove在大规模数据处理中的优势就是一个字——快。
  • pepiline的流程:读取文件——关键词过滤——fasttext过滤——写入文件。中间还可以插入自定义正则过滤LambdaFilter。
  • 因为我没有部署集群,所有选择LocalPipelineExecutor。需要注意的是,windows下一定要填写start_method="spawn"参数。因为默认参数是Linux系统专用的。

项目实践与挑战

        后续有时间了,我会更新代码到我的仓库(目前已更新了初版的代码,后续会添加脚本说明文档,地址:1cixin/datatrve-fasttext),我还编写了许多tools来处理jsonl和parquet脚本。例如:读取文件结构、分数分布情况、根据分数提取数据、tokens计算和只选取某个区间的数据等。以下是一个pipeline脚本例子:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from src.datatrove.pipeline.filters.car_filter import CarLawFilter, CarManufacturingFilter, CarMoldFilter
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.filters import FastTextClassifierFilter, LambdaFilter
from datatrove.pipeline.writers import JsonlWriter
from datatrove.executor import LocalPipelineExecutor

# 创建处理管道
pipeline = [
    JsonlReader(
        data_folder="./input_data/mold-filter",  # 替换为您的JSONL文件目录
        text_key="text",  # 假设JSONL文件中文本字段名为"text" 
        # id_key="id",     # 假设JSONL文件中ID字段名为"id"
        recursive=False,    # 是否递归读取子目录
        glob_pattern="*.jsonl"  # 只处理jsonl文件
    ),
    
    CarMoldFilter(),    # 自定义关键词过滤

    FastTextClassifierFilter(
        keep_labels=[("mold", 0.5)],    # 标签与阈值
        model_url="D:/Projects/fasttext/model_fasttext_0514.bin", # fasttext模型路径
        filter_mode="DOCUMENT",
        save_labels_in_metadata=True  # 确保保存标签到元数据
    ),


    JsonlWriter(
        # output_folder="./output_data/recall",
        output_folder=r"D:\Projects\datatrove_e\datatrove\output_data\recall",
        output_filename="${rank}_filter.jsonl",    # 默认压缩成jsonl.gz文件
        compression=None,    # 不压缩
    )
]


# 创建并运行本地执行器
executor = LocalPipelineExecutor(
    pipeline=pipeline_1,
    tasks=2,  # 任务数量(根据CPU核心数调整)
    workers=2,  # 并行工作进程数
    logging_dir="./logs",  # 日志目录
    skip_completed=False,  # 禁用自动跳过
    start_method="spawn",  # 使用spawn模式
)


if __name__ == "__main__":
    print("开始处理汽车行业相关数据...")
    executor.run()
    print("数据处理完成!")

            希望这篇文章对您有帮助,如果错误或不足的地方,欢迎指正。

    Logo

    火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

    更多推荐