概述:

6月做语音填单时,用领域数据微调的BERT模型,主要用于对用户输出文本进行实体识别(NER),从自然语言指令中抽取表单字段和对应的值;

后来使用纯前端推理,采用GGUF 方案,使用GPT模型,BERT就不能用了,重新记录下,别忘了。

环境

服务器

主要基于bert做增量微调,命名实体识别任务

基座模型

google-bert/bert-base-chinese

硬件

TITAN Xp(12GB) * 1(12G)

软件

PyTorch  2.5.1 Python  3.12(ubuntu22.04) CUDA  12.4

内存

15G

备注

AutoDL上租一个就行,0.52/时

基座模型

数据集:

BIO标注

标签

全称

含义

示例

B-

Begin

实体的起始位置

B-字段 表示"字段"实体的开始

I-

Inside

实体的内部位置

I-字段 表示"字段"实体的后续部分

O

Outside

不属于任何实体

普通词或标点符号

举例:

文本:帮我在地址中输入北京市朝阳区

提取:字段实体(地址)/值实体(北京市朝阳区)

原始文本

帮我在地址中输入北京市朝阳区

标注文本

O  O  O B-字 I-字 O  O  O B-值 I-值 I-值 I-值 I-值 I-值

说明

字符级标注(每个字对应一个标签):

  • 帮我在 → 都是"O",表示这些字不是要抽取的实体
  • 地址 → B-字段 + I-字段,表示这是一个"字段"类型的实体
  • 中输入 → 都是"O",不是实体
  • 北京市朝阳区 → B-值 + 5个I-值,表示这是一个"值"类型的实体

数据格式

根据BIO标注数据,

上传至服务器

[
  {
    "text": "帮我在地址中输入北京市朝阳区",
    "labels": [
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "帮我在姓名中输入李四",
    "labels": [
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "请帮我填写年龄为25",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "请帮我填写年龄为30",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "请帮我填写学历为硕士",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "请将身份证号设置为示例值",
    "labels": [
      "O",
      "O",
      "B-字段",
      "I-字段",
      "I-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "请将姓名设置为李四",
    "labels": [
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "请将学历设置为本科",
    "labels": [
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "请将邮箱设置为zhangsan@example.com",
    "labels": [
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在地址字段中填入上海市浦东新区",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在电话字段中填入13800138000",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在电话字段中填入13900139000",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在年龄字段中填入25",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在身份证号字段中填入示例值",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "I-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在姓名字段中填入张三",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在学历字段中填入硕士",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "我需要在职业字段中填入工程师",
    "labels": [
      "O",
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "我要在电话里填写13900139000",
    "labels": [
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值"
    ]
  },
  {
    "text": "我要在姓名里填写李四",
    "labels": [
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值"
    ]
  },
  {
    "text": "我要在邮箱里填写zhangsan@example.com",
    "labels": [
      "O",
      "O",
      "O",
      "B-字段",
      "I-字段",
      "O",
      "O",
      "O",
      "B-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值",
      "I-值"
    ]
  }
]

数据集处理类

文件:ner_dataset.py

功能:用于加载和预处理命名实体识别任务的数据,实现基本方法 len、getitem


import json
import torch
from torch.utils.data import Dataset
from transformers import BertTokenizerFast


class NERDataset(Dataset):
    def __init__(self, json_path, label_list=None, max_length=512, pretrained_model='bert-base-chinese'):
        with open(json_path, 'r', encoding='utf-8') as f:
            self.data = json.load(f)
        self.tokenizer = BertTokenizerFast.from_pretrained(pretrained_model, local_files_only=True)
        self.max_length = max_length
        if label_list is None:
            label_list = ['O', 'B-字段', 'I-字段', 'B-值', 'I-值']
        self.label2id = {label: idx for idx, label in enumerate(label_list)}
        self.id2label = {idx: label for label, idx in self.label2id.items()}
        print(f"数据集加载完成,共{len(self.data)}个样本")
        print(f"标签映射: {self.label2id}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        text = item['text']
        labels = item['labels']
        tokens = list(text)
        encoding = self.tokenizer(
            tokens,
            is_split_into_words=True,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        word_ids = encoding.word_ids()
        label_ids = []
        prev_word_id = None
        for word_id in word_ids:
            if word_id is None:
                label_ids.append(-100)
            elif word_id != prev_word_id:
                label_ids.append(self.label2id[labels[word_id]])
            else:
                label_ids.append(self.label2id[labels[word_id]])
            prev_word_id = word_id
        label_ids = label_ids[:self.max_length]
        if len(label_ids) < self.max_length:
            label_ids += [-100] * (self.max_length - len(label_ids))
        encoding['labels'] = torch.tensor(label_ids)
        for key in ['input_ids', 'attention_mask']:
            if key in encoding:
                encoding[key] = encoding[key].squeeze(0)
        if 'token_type_ids' in encoding:
            encoding['token_type_ids'] = encoding['token_type_ids'].squeeze(0)
        return encoding


def test_dataset():
    print("=== 测试NER数据集 ===")
    MAX_LENGTH = 512
    DATA_PATH = '/root/autodl-tmp/ner/dataset/train.json'
    MODEL_NAME = '/root/autodl-tmp/llm/google-bert/bert-base-chinese'
    LABEL_LIST = ['O', 'B-字段', 'I-字段', 'B-值', 'I-值']
    try:
        train_dataset = NERDataset(
            DATA_PATH,
            label_list=LABEL_LIST,
            max_length=MAX_LENGTH,
            pretrained_model=MODEL_NAME
        )
        print(f"数据集创建成功,共{len(train_dataset)}个样本")
        for i in range(min(3, len(train_dataset))):
            print(f"\n--- 样本 {i} ---")
            original_text = train_dataset.data[i]['text']
            original_labels = train_dataset.data[i]['labels']
            processed_data = train_dataset[i]
            print(f"原始文本: {original_text}")
            print(f"原始标签: {original_labels}")
            print(f"输入ID形状: {processed_data['input_ids'].shape}")
            print(f"标签形状: {processed_data['labels'].shape}")
            print(f"注意力掩码形状: {processed_data['attention_mask'].shape}")
            decoded_text = train_dataset.tokenizer.decode(processed_data['input_ids'][0])
            print(f"解码后文本: {decoded_text}")
            label_ids = processed_data['labels'].tolist()
            mapped_labels = [train_dataset.id2label.get(lid, 'PAD') for lid in label_ids if lid != -100]
            print(f"映射后标签: {mapped_labels}")
            print("-" * 50)
        print("数据集测试完成!")
        return True
    except Exception as e:
        print(f"数据集测试失败: {e}")
        return False


if __name__ == "__main__":
    test_dataset()

微调训练

配置文件

文件:config.py

功能:统一管理训练参数、模型路径、数据集路径等,学习率、轮次、设备、权重路径等都在这配置

import os


class NERConfig:
    DATA_PATH = '/root/autodl-tmp/ner/dataset/train.json'
    VAL_DATA_PATH = '/root/autodl-tmp/ner/dataset/val.json'
    MODEL_NAME = '/root/autodl-tmp/llm/google-bert/bert-base-chinese'
    TOKENIZER_PATH = '/root/autodl-tmp/llm/google-bert/bert-base-chinese'
    LABEL_LIST = ['O', 'B-字段', 'I-字段', 'B-值', 'I-值']
    EPOCH = 3000
    BATCH_SIZE = 20
    LEARNING_RATE = 2e-5
    MAX_LENGTH = 512
    DEVICE = 'cuda'
    SAVE_DIR = 'params'
    BEST_MODEL_PATH = 'params/best_model.pth'
    EVAL_STEPS = 10
    RANDOM_SEED = 42
    USE_AMP = False
    MAX_GRAD_NORM = 1.0

    @classmethod
    def create_save_dir(cls):
        if not os.path.exists(cls.SAVE_DIR):
            os.makedirs(cls.SAVE_DIR)
            print(f"创建保存目录: {cls.SAVE_DIR}")

    @classmethod
    def get_device(cls):
        import torch
        device = torch.device(cls.DEVICE if torch.cuda.is_available() else "cpu")
        print(f"使用设备: {device}")
        if device.type == 'cuda':
            print(f"GPU名称: {torch.cuda.get_device_name()}")
            print(f"GPU显存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB")
        return device

    @classmethod
    def set_random_seed(cls):
        import torch
        import numpy as np
        import random
        torch.manual_seed(cls.RANDOM_SEED)
        torch.cuda.manual_seed(cls.RANDOM_SEED)
        torch.cuda.manual_seed_all(cls.RANDOM_SEED)
        np.random.seed(cls.RANDOM_SEED)
        random.seed(cls.RANDOM_SEED)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
        print(f"随机种子设置为: {cls.RANDOM_SEED}")

    @classmethod
    def print_config(cls):
        print("=" * 50)
        print("NER任务配置信息")
        print("=" * 50)
        print(f"数据集路径: {cls.DATA_PATH}")
        print(f"验证集路径: {cls.VAL_DATA_PATH}")
        print(f"模型路径: {cls.MODEL_NAME}")
        print(f"分词器路径: {cls.TOKENIZER_PATH}")
        print(f"标签列表: {cls.LABEL_LIST}")
        print(f"标签数量: {len(cls.LABEL_LIST)}")
        print(f"\n训练参数:")
        print(f"  训练轮次: {cls.EPOCH}")
        print(f"  批次大小: {cls.BATCH_SIZE}")
        print(f"  学习率: {cls.LEARNING_RATE}")
        print(f"  最大序列长度: {cls.MAX_LENGTH}")
        print(f"  评估步数: {cls.EVAL_STEPS}")
        print(f"\n设备配置:")
        device = cls.get_device()
        print(f"  设备: {device}")
        print(f"\n保存配置:")
        print(f"  保存目录: {cls.SAVE_DIR}")
        print(f"  最佳模型路径: {cls.BEST_MODEL_PATH}")
        print(f"\n其他配置:")
        print(f"  随机种子: {cls.RANDOM_SEED}")
        print(f"  混合精度训练: {cls.USE_AMP}")
        print(f"  梯度裁剪阈值: {cls.MAX_GRAD_NORM}")
        print("=" * 50)

    @classmethod
    def validate_config(cls):
        errors = []
        if not os.path.exists(cls.DATA_PATH):
            errors.append(f"训练数据文件不存在: {cls.DATA_PATH}")
        if not os.path.exists(cls.MODEL_NAME):
            errors.append(f"模型路径不存在: {cls.MODEL_NAME}")
        if cls.BATCH_SIZE <= 0:
            errors.append(f"批次大小必须大于0: {cls.BATCH_SIZE}")
        if cls.LEARNING_RATE <= 0:
            errors.append(f"学习率必须大于0: {cls.LEARNING_RATE}")
        if cls.MAX_LENGTH <= 0:
            errors.append(f"最大序列长度必须大于0: {cls.MAX_LENGTH}")
        if errors:
            print("配置验证失败:")
            for error in errors:
                print(f"  ❌ {error}")
            return False
        else:
            print("✅ 配置验证通过")
            return True


def test_config():
    print("=== 测试NER配置 ===")
    NERConfig.print_config()
    is_valid = NERConfig.validate_config()
    NERConfig.set_random_seed()
    NERConfig.create_save_dir()
    if is_valid:
        print("配置测试完成!")
        return True
    else:
        print("配置测试失败!")
        return False


if __name__ == "__main__":
    test_config()

执行后会在当前路径新建文件夹params,用于保存权重

微调脚本

文件:ner_train.py

功能:NER模型训练脚本,用于训练命名实体识别模型,从自然语言指令中抽取表单字段和值

import os
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForTokenClassification
from torch.optim import AdamW
from ner_dataset import NERDataset
import torch.nn as nn
import numpy as np
from config import NERConfig
import time
from datetime import datetime


def setup_environment():
    print("=" * 60)
    print("开始设置训练环境")
    print("=" * 60)
    device = NERConfig.get_device()
    NERConfig.set_random_seed()
    NERConfig.create_save_dir()
    if not NERConfig.validate_config():
        raise ValueError("配置验证失败,请检查配置文件")
    print("环境设置完成!")
    return device


def create_data_loader():
    print("正在创建数据加载器...")
    train_dataset = NERDataset(
        NERConfig.DATA_PATH,
        label_list=NERConfig.LABEL_LIST,
        max_length=NERConfig.MAX_LENGTH,
        pretrained_model=NERConfig.TOKENIZER_PATH
    )

    def collate_fn(data):
        try:
            if not data:
                raise ValueError("数据列表为空")
            batch = {}
            for key in data[0].keys():
                try:
                    batch[key] = torch.stack([item[key] for item in data])
                except Exception as e:
                    print(f"处理键 '{key}' 时出现错误: {e}")
                    print(f"数据形状: {[item[key].shape for item in data]}")
                    raise
            return batch
        except Exception as e:
            print(f"collate_fn 出现错误: {e}")
            raise

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=NERConfig.BATCH_SIZE,
        shuffle=True,
        drop_last=True,
        collate_fn=collate_fn
    )
    print(f"数据加载器创建完成,共{len(train_dataset)}个样本,{len(train_loader)}个批次")
    return train_loader


def create_model(device):
    print("正在创建模型...")
    model = BertForTokenClassification.from_pretrained(
        NERConfig.MODEL_NAME,
        num_labels=len(NERConfig.LABEL_LIST)
    )
    model = model.to(device)
    print(f"模型创建完成,标签数量: {len(NERConfig.LABEL_LIST)}")
    return model


def create_optimizer(model):
    optimizer = AdamW(model.parameters(), lr=NERConfig.LEARNING_RATE)
    print(f"优化器创建完成,学习率: {NERConfig.LEARNING_RATE}")
    return optimizer


def evaluate_model(model, data_loader, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            token_type_ids = batch.get('token_type_ids', None)
            if token_type_ids is not None:
                token_type_ids = token_type_ids.to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
            logits = outputs.logits
            loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
            loss = loss_fct(logits.view(-1, len(NERConfig.LABEL_LIST)), labels.view(-1))
            total_loss += loss.item()
            predictions = logits.argmax(dim=-1)
            valid_mask = labels != -100
            correct += (predictions[valid_mask] == labels[valid_mask]).sum().item()
            total += valid_mask.sum().item()
    avg_loss = total_loss / len(data_loader)
    accuracy = correct / total if total > 0 else 0
    return avg_loss, accuracy


def train_epoch(model, train_loader, optimizer, device, epoch):
    try:
        model.train()
        total_loss = 0
        total_correct = 0
        total_valid = 0
        start_time = time.time()
        for batch_idx, batch in enumerate(train_loader):
            try:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                token_type_ids = batch.get('token_type_ids', None)
                if token_type_ids is not None:
                    token_type_ids = token_type_ids.to(device)
                labels = batch['labels'].to(device)
                try:
                    if token_type_ids is not None:
                        outputs = model(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
                    else:
                        outputs = model(input_ids, attention_mask=attention_mask)
                    logits = outputs.logits
                except Exception as e:
                    print(f"前向传播错误: {e}")
                    print(f"input_ids shape: {input_ids.shape}")
                    print(f"attention_mask shape: {attention_mask.shape}")
                    print(f"token_type_ids: {token_type_ids}")
                    raise
                loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
                loss = loss_fct(logits.view(-1, len(NERConfig.LABEL_LIST)), labels.view(-1))
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
                predictions = logits.argmax(dim=-1)
                valid_mask = labels != -100
                correct = 0
                if valid_mask.sum() > 0:
                    correct = (predictions[valid_mask] == labels[valid_mask]).sum().item()
                    total_correct += correct
                    total_valid += valid_mask.sum().item()
                if batch_idx % 5 == 0:
                    batch_acc = correct / valid_mask.sum().item() if valid_mask.sum() > 0 else 0
                    print(f"Epoch {epoch}, Batch {batch_idx}/{len(train_loader)}, "
                          f"Loss: {loss.item():.4f}, Acc: {batch_acc:.4f}")
            except Exception as e:
                print(f"批次 {batch_idx} 训练时出现错误: {e}")
                continue
        avg_loss = total_loss / len(train_loader) if len(train_loader) > 0 else 0
        avg_acc = total_correct / total_valid if total_valid > 0 else 0
        epoch_time = time.time() - start_time
        print(f"Epoch {epoch} 完成 - "
              f"平均损失: {avg_loss:.4f}, "
              f"平均准确率: {avg_acc:.4f}, "
              f"用时: {epoch_time:.2f}秒")
        return avg_loss, avg_acc
    except Exception as e:
        print(f"Epoch {epoch} 训练过程中出现错误: {e}")
        import traceback
        traceback.print_exc()
        return 0.0, 0.0


def save_model(model, epoch, loss, accuracy):
    model_path = f"{NERConfig.SAVE_DIR}/epoch_{epoch}_acc_{accuracy:.4f}_bert.pth"
    torch.save(model.state_dict(), model_path)
    info_path = f"{NERConfig.SAVE_DIR}/epoch_{epoch}_acc_{accuracy:.4f}_info.txt"
    with open(info_path, 'w', encoding='utf-8') as f:
        f.write(f"Epoch: {epoch}\n")
        f.write(f"Loss: {loss:.4f}\n")
        f.write(f"Accuracy: {accuracy:.4f}\n")
        f.write(f"Save Time: {datetime.now()}\n")
        f.write(f"Performance-based save: True\n")
    print(f"性能提升模型已保存: {model_path}")


def train_model():
    print("=" * 60)
    print("开始NER模型训练")
    print("=" * 60)
    try:
        device = setup_environment()
        train_loader = create_data_loader()
        model = create_model(device)
        optimizer = create_optimizer(model)
        print(f"\n开始训练,共{NERConfig.EPOCH}个epoch...")
        best_accuracy = 0
        training_history = []
        for epoch in range(NERConfig.EPOCH):
            loss, accuracy = train_epoch(model, train_loader, optimizer, device, epoch)
            training_history.append({
                'epoch': epoch,
                'loss': loss,
                'accuracy': accuracy
            })
            if epoch % 22 == 0:
                save_model(model, epoch, loss, accuracy)
                best_accuracy = accuracy
                best_model_path = f"{NERConfig.SAVE_DIR}/best_model.pth"
                torch.save(model.state_dict(), best_model_path)
                print(f"新的最佳模型已保存,准确率: {best_accuracy:.4f}")
            if epoch % 100 == 0:
                checkpoint_path = f"{NERConfig.SAVE_DIR}/checkpoint_epoch_{epoch}.pth"
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': loss,
                    'accuracy': accuracy
                }, checkpoint_path)
                print(f"检查点已保存: {checkpoint_path}")
            if epoch % NERConfig.EVAL_STEPS == 0:
                print(f"当前最佳准确率: {best_accuracy:.4f}")
        print("\n" + "=" * 60)
        print("训练完成!")
        print(f"最佳准确率: {best_accuracy:.4f}")
        print(f"最佳模型路径: {NERConfig.SAVE_DIR}/best_model.pth")
        print("=" * 60)
        return True
    except Exception as e:
        print(f"训练过程中出现错误: {e}")
        return False


if __name__ == '__main__':
    success = train_model()
    if success:
        print("✅ 训练成功完成!")
    else:
        print("❌ 训练失败!")

早停:训练数据集少,40个epoch就开始躺平了,在200个epoch的loss已经到0.0007了,手动停了吧。

推理评估

文件:ner_inference.py

功能:用于加载训练好的模型并进行预测,从自然语言指令中抽取表单字段和值

import torch
import json
from transformers import BertTokenizerFast, BertForTokenClassification
from ner_dataset import NERDataset
from config import NERConfig


class NERPredictor:
    def __init__(self, model_path, tokenizer_path, label_list, device='cuda'):
        self.device = torch.device(device if torch.cuda.is_available() else "cpu")
        print(f"使用设备: {self.device}")
        self.label_list = label_list
        self.label2id = {label: idx for idx, label in enumerate(label_list)}
        self.id2label = {idx: label for label, idx in self.label2id.items()}
        print("正在加载分词器...")
        self.tokenizer = BertTokenizerFast.from_pretrained(tokenizer_path)
        print("正在加载模型...")
        self.model = BertForTokenClassification.from_pretrained(
            tokenizer_path,
            num_labels=len(label_list)
        )
        self.model.load_state_dict(torch.load(model_path, map_location=self.device))
        self.model.to(self.device)
        self.model.eval()
        print("模型加载完成!")

    def predict(self, text, max_length=512):
        print(f"正在预测文本: {text}")
        tokens = list(text)
        encoding = self.tokenizer(
            tokens,
            is_split_into_words=True,
            truncation=True,
            padding='max_length',
            max_length=max_length,
            return_tensors='pt'
        )
        input_ids = encoding['input_ids'].to(self.device)
        attention_mask = encoding['attention_mask'].to(self.device)
        token_type_ids = encoding.get('token_type_ids', None)
        if token_type_ids is not None:
            token_type_ids = token_type_ids.to(self.device)
        with torch.no_grad():
            outputs = self.model(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
            predictions = outputs.logits.argmax(dim=-1)
        word_ids = encoding.word_ids()
        predicted_labels = []
        token_to_word = []
        for i, word_id in enumerate(word_ids):
            if word_id is not None:
                predicted_label = self.id2label[predictions[0][i].item()]
                predicted_labels.append(predicted_label)
                token_to_word.append(word_id)
        entities = self.extract_entities(text, predicted_labels, token_to_word)
        return predicted_labels, entities

    def extract_entities(self, text, labels, token_to_word):
        entities = []
        current_entity = None
        current_start = None
        current_end = None
        for i, (label, word_id) in enumerate(zip(labels, token_to_word)):
            if label.startswith('B-'):
                if current_entity is not None:
                    entity_text = text[current_start:current_end + 1]
                    entities.append({
                        'type': current_entity,
                        'text': entity_text,
                        'start': current_start,
                        'end': current_end + 1
                    })
                current_entity = label[2:]
                current_start = word_id
                current_end = word_id
            elif label.startswith('I-') and current_entity is not None:
                if label[2:] == current_entity:
                    current_end = word_id
                else:
                    entity_text = text[current_start:current_end + 1]
                    entities.append({
                        'type': current_entity,
                        'text': entity_text,
                        'start': current_start,
                        'end': current_end + 1
                    })
                    current_entity = None
                    current_start = None
                    current_end = None
            else:
                if current_entity is not None:
                    entity_text = text[current_start:current_end + 1]
                    entities.append({
                        'type': current_entity,
                        'text': entity_text,
                        'start': current_start,
                        'end': current_end + 1
                    })
                    current_entity = None
                    current_start = None
                    current_end = None
        if current_entity is not None:
            entity_text = text[current_start:current_end + 1]
            entities.append({
                'type': current_entity,
                'text': entity_text,
                'start': current_start,
                'end': current_end + 1
            })
        return entities

    def extract_field_value_pairs(self, entities):
        field_value_pairs = {}
        fields = [e for e in entities if e['type'] == '字段']
        values = [e for e in entities if e['type'] == '值']
        for i, field in enumerate(fields):
            if i < len(values):
                field_value_pairs[field['text']] = values[i]['text']
        return field_value_pairs


def test_model():
    print("=" * 60)
    print("开始测试NER模型")
    print("=" * 60)
    MODEL_PATH = "params/best_model.pth"
    TOKENIZER_PATH = NERConfig.TOKENIZER_PATH
    LABEL_LIST = NERConfig.LABEL_LIST
    test_texts = [
        "帮我在地址中输入北京市朝阳区",
        "请帮我填写年龄为25",
        "请将姓名设置为李四",
        "我需要在电话字段中填入13800138000",
        "请将邮箱设置为zhangsan@example.com",
        "请帮我填写学历为硕士",
        "我需要在身份证号字段中填入示例值"
    ]
    try:
        print("正在创建预测器...")
        predictor = NERPredictor(MODEL_PATH, TOKENIZER_PATH, LABEL_LIST)
        print("\n" + "=" * 60)
        print("NER模型测试结果")
        print("=" * 60)
        for i, text in enumerate(test_texts, 1):
            print(f"\n--- 测试 {i}: {text} ---")
            predicted_labels, entities = predictor.predict(text)
            print(f"预测标签: {predicted_labels}")
            if entities:
                print("提取的实体:")
                for entity in entities:
                    print(f"  - 类型: {entity['type']}, "
                          f"文本: '{entity['text']}', "
                          f"位置: {entity['start']}-{entity['end']}")
            else:
                print("未提取到实体")
            field_value_pairs = predictor.extract_field_value_pairs(entities)
            if field_value_pairs:
                print("结构化结果:")
                for field, value in field_value_pairs.items():
                    print(f"  {field}: {value}")
            else:
                print("未提取到字段-值对")
            print("-" * 50)
        print("\n测试完成!")
        return True
    except FileNotFoundError:
        print("❌ 模型文件未找到,请先训练模型")
        print(f"期望的模型路径: {MODEL_PATH}")
        return False
    except Exception as e:
        print(f"❌ 测试过程中出现错误: {e}")
        return False


def interactive_test():
    print("=" * 60)
    print("交互式NER测试")
    print("=" * 60)
    MODEL_PATH = "params/best_model.pth"
    TOKENIZER_PATH = NERConfig.TOKENIZER_PATH
    LABEL_LIST = NERConfig.LABEL_LIST
    try:
        print("正在加载模型...")
        predictor = NERPredictor(MODEL_PATH, TOKENIZER_PATH, LABEL_LIST)
        print("\n模型加载完成!现在可以输入文本进行测试。")
        print("输入 'quit' 或 'exit' 退出测试。")
        print("-" * 50)
        while True:
            text = input("\n请输入要测试的文本: ").strip()
            if text.lower() in ['quit', 'exit', '退出']:
                print("退出测试。")
                break
            if not text:
                print("请输入有效的文本。")
                continue
            try:
                predicted_labels, entities = predictor.predict(text)
                print(f"\n预测结果:")
                print(f"输入文本: {text}")
                print(f"预测标签: {predicted_labels}")
                if entities:
                    print("提取的实体:")
                    for entity in entities:
                        print(f"  - {entity['type']}: '{entity['text']}'")
                    field_value_pairs = predictor.extract_field_value_pairs(entities)
                    if field_value_pairs:
                        print("结构化结果:")
                        for field, value in field_value_pairs.items():
                            print(f"  {field}: {value}")
                    else:
                        print("未提取到字段-值对")
                else:
                    print("未提取到实体")
            except Exception as e:
                print(f"预测过程中出现错误: {e}")
    except FileNotFoundError:
        print("❌ 模型文件未找到,请先训练模型")
    except Exception as e:
        print(f"❌ 交互式测试出现错误: {e}")


if __name__ == "__main__":
    print("选择测试模式:")
    print("1. 自动测试(使用预设文本)")
    print("2. 交互式测试(手动输入文本)")
    choice = input("请输入选择 (1 或 2): ").strip()
    if choice == "1":
        success = test_model()
        if success:
            print("✅ 自动测试完成!")
        else:
            print("❌ 自动测试失败!")
    elif choice == "2":
        interactive_test()
    else:
        print("无效选择,运行自动测试...")
        test_model()

合并导出

文件:model_merge.py

功能:将基座模型和微调权重合并为一个完整的模型

import torch
from transformers import BertTokenizerFast, BertForTokenClassification


def create_deployment_model():
    print("正在创建部署模型...")
    model = BertForTokenClassification.from_pretrained("/root/autodl-tmp/llm/google-bert/bert-base-chinese", num_labels=5)
    model.load_state_dict(torch.load("/root/autodl-tmp/ner/codes/params/best_model.pth"))
    model.save_pretrained("deployment_model")
    tokenizer = BertTokenizerFast.from_pretrained("/root/autodl-tmp/llm/google-bert/bert-base-chinese")
    tokenizer.save_pretrained("deployment_model")
    print("部署模型创建完成!")
    print("文件位置: deployment_model/")


def ner_predict_deployed(text, model_path="deployment_model"):
    model = BertForTokenClassification.from_pretrained(model_path)
    tokenizer = BertTokenizerFast.from_pretrained(model_path)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    label_list = ['O', 'B-字段', 'I-字段', 'B-值', 'I-值']
    id2label = {idx: label for idx, label in enumerate(label_list)}
    tokens = list(text)
    encoding = tokenizer(
        tokens,
        is_split_into_words=True,
        truncation=True,
        padding='max_length',
        max_length=512,
        return_tensors='pt'
    )
    with torch.no_grad():
        outputs = model(
            encoding['input_ids'].to(device),
            attention_mask=encoding['attention_mask'].to(device)
        )
        predictions = outputs.logits.argmax(dim=-1)
    word_ids = encoding.word_ids()
    predicted_labels = []
    token_to_word = []
    for i, word_id in enumerate(word_ids):
        if word_id is not None:
            predicted_label = id2label[predictions[0][i].item()]
            predicted_labels.append(predicted_label)
            token_to_word.append(word_id)
    entities = extract_entities_simple(text, predicted_labels, token_to_word)
    field_value_pairs = {}
    fields = [e for e in entities if e['type'] == '字段']
    values = [e for e in entities if e['type'] == '值']
    for i, field in enumerate(fields):
        if i < len(values):
            field_value_pairs[field['text']] = values[i]['text']
    return field_value_pairs


def extract_entities_simple(text, labels, token_to_word):
    entities = []
    current_entity = None
    current_start = None
    current_end = None
    for i, (label, word_id) in enumerate(zip(labels, token_to_word)):
        if label.startswith('B-'):
            if current_entity is not None:
                entity_text = text[current_start:current_end + 1]
                entities.append({'type': current_entity, 'text': entity_text})
            current_entity = label[2:]
            current_start = word_id
            current_end = word_id
        elif label.startswith('I-') and current_entity is not None:
            if label[2:] == current_entity:
                current_end = word_id
        else:
            if current_entity is not None:
                entity_text = text[current_start:current_end + 1]
                entities.append({'type': current_entity, 'text': entity_text})
                current_entity = None
    if current_entity is not None:
        entity_text = text[current_start:current_end + 1]
        entities.append({'type': current_entity, 'text': entity_text})
    return entities


if __name__ == "__main__":
    create_deployment_model()
    test_text = "帮我在姓名中输入李四"
    result = ner_predict_deployed(test_text)
    print(f"测试结果: {result}")

链接(源码):

https://github.com/zenmathink/2728/tree/main/ner

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐