1. 思路介绍

1.1 任务抽象

对中文分词任务进行抽象,可以变成一个标注任务,也就是对于每一个字,打上BMES等标签中的一个,之后通过标签对字进行切分。(B - Begin, M - Middle, E - end, S - Single[单字作词])

1.2 方法选择

Bi-LSTM是一个基于RNN的用于序列处理的神经网络,RNN的特点便是对于输入的长为seq_len的序列,会输出一个处理过后的相同长度的序列,特别适合用于标注任务。而Bi-LSTM又通过门控策略使得模型能更好的捕捉长上下文之间的关系,双向又让模型能够充分利用前后文本的信息(类比双向最大匹配分词)

所以,Bi-LSTM 就决定是你了!

1.3 实现方法
  • 数据准备
  • 模型构建
  • 训练
  • 测试及使用
import torch
import torch.nn as nn
from tqdm import tqdm
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

2. 数据准备

2.1 数据集介绍

Weibo分词数据集NLPCC2016分词赛道使用的数据集(把datasets以及里面的train,test,dev下载了就行),该数据集由复旦大学根据新浪微博的数据标注生成,包含更多口语化的文本,包含经济、运动、环境等多种主题的语料。

2.2 数据集加载
def load_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        data = file.readlines()
    print(f"Data size: {len(data)}")
    return data
2.3 数据预处理

目标:得到数据集data_loader,其中的数据对为(字符串的字符级编码,分类标签序列)

# 构建词典
def construct_words_map(line_list):
    words_list = list("".join(line_list))
    words_list = list(set(words_list))
    num = len(words_list)
    words_map = {}
    for i in range(num):
        words_map[words_list[i]] = i
    words_map['None'] = num
    return words_map, num + 1
# 字符级编码
def inp_convert(inp, words_map):
    lis = []
    length = len(inp)
    for i in range(length):
        lis.append(words_map[inp[i]])
    return torch.tensor(lis)
# 分词数据集类,用于构造dataloader
class DivDataset(Dataset):
    def __init__(self, line_list, words_map, fix_length=100):
        self.inputs = []
        self.outputs = []
        for line in line_list:
            line = line.strip()
            # 去掉换行和空格
            input = inp_convert(line.replace(" ", ""), words_map)
            # 空格切分
            words = line.split(" ")
            output = []
            for word in words:
                if len(word) > 1:
                    output += [0] + [1]*(len(word)-2) + [2]
                else:
                    output += [3]
            output = torch.tensor(output)

            # 裁剪填充对齐
            if input.size(0) < fix_length:
                input = F.pad(input, (0, fix_length - input.size(0)), value=words_map['None'])
                output = F.pad(output, (0, fix_length - output.size(0)), value=-1)
            else:
                input = input[:fix_length]
                output = output[:fix_length]

            self.inputs.append(input)
            self.outputs.append(output)

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        input_data = self.inputs[idx]
        output_data = self.outputs[idx]
        return input_data, output_data
train_data = load_data('./datasets/train.dat')
test_data = load_data("./datasets/dev.dat")
words_map, vocab_size = construct_words_map(train_data + test_data)
train_dataset = DivDataset(train_data, words_map)
train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)

3. 模型构建

3.1 初始化模型结构
class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_labels):
        super().__init__()
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        # 双向LSTM
        self.lstm = nn.LSTM(embed_dim, hidden_dim//2, 
                            num_layers=1, bidirectional=True, batch_first=True)
        # 全连接层作多分类
        self.fc = nn.Linear(hidden_dim, num_labels)
        
    def forward(self, x):
        # x: (batch_size, seq_len)
        embeds = self.embedding(x)  # (batch_size, seq_len, embed_dim)
        lstm_out, _ = self.lstm(embeds)  # lstm_out: (batch_size, seq_len, hidden_dim)
        logits = self.fc(lstm_out)  # (batch_size, seq_len, num_labels)
        return logits
3.2 初始化模型并移动到对应设备
def get_device():
    return torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
embed_dim = 128
hidden_dim = 64
num_label = 4
device = get_device()
model = BiLSTM(vocab_size, embed_dim, hidden_dim, num_label).to(device)
print(device)

4. 训练模型

# 训练函数
def train_model(model, train_loader, optimizer, criterion, device, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_preds = 0
        total_preds = 0
        
        for batch_data in tqdm(train_loader):
            inp, ans = batch_data
            inp = inp.to(device)
            ans = ans.to(device)

            optimizer.zero_grad()   # 梯度清零(不然每次会累积)
            logits = model(inp)     # 前向传播
            
            loss = criterion(
                logits.view(-1, logits.size(-1)),   # (batch_size*seq_len, num_labels)
                ans.view(-1)                        # (batch_size*seq_len)
            )   # 调整维度计算损失
            
            # 反向传播与优化
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()     # 统计指标
            
            # 计算准确率
            preds = torch.argmax(logits, dim=2)  # (batch_size, seq_len)
            mask = (ans != -1)  # 有效位置掩码
            
            correct_preds += torch.sum((preds == ans) & mask).item()
            total_preds += torch.sum(mask).item()
            

        # 计算epoch统计指标
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct_preds / total_preds if total_preds > 0 else 0
        
        print(f'Epoch {epoch+1}/{num_epochs}')
        print(f'Training Loss: {epoch_loss:.4f} | Accuracy: {epoch_acc:.4f}\n')
    
    print("Training complete!")
    return model
# 定义优化器,损失函数病进行训练(而你,我的朋友,是真正的调参高手)

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=0.001,          # 初始学习率
    weight_decay=1e-4  # L2正则化(防止过拟合)
)

criterion = nn.CrossEntropyLoss(
    ignore_index=-1  # 忽略填充部分
)

train_model(model, train_loader, optimizer, criterion, device)

5. 模型测试

def evaluate(model, val_loader):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inp, ans in val_loader:
            inp = inp.to(device)
            ans = ans.to(device)
            logits = model(inp)

            preds = torch.argmax(logits, dim=2).cpu()
            # 只统计有效长度部分
            for i in range(len(inp)):
                nonzero = torch.where(ans[i] != -1)[0]
                valid_pred = preds[i][:nonzero[-1].item()].to("cpu")
                valid_label = ans[i][:nonzero[-1].item()].to("cpu")

                correct += (valid_pred == valid_label).sum().item()
                total += nonzero[-1].item()
    
    acc = correct / total
    print(f'Validation Accuracy: {acc:.4f}')
    return acc
test_dataset = DivDataset(test_data, words_map)
test_loader = DataLoader(test_dataset, batch_size=10, shuffle=True)
acc = evaluate(model, test_loader)

6. 模型的保存与加载

这里提供的方法是直接保存和加载整个模型,包括架构和权重。其实可以用torch.save(model.state_dict(), save_path)只保存权重,但是加载的时候需要先将模型初始化好

# 保存模型,路径后缀需要是.pth
def save_model(model, save_path):
    torch.save(model, save_path)

# 加载模型
def load_model(load_path, device):
    model = torch.load(load_path).to(device)
    return model
save_model(model, "./model/seperate.pth")
model_ = load_model("./model/seperate.pth", device)

7. 模型使用体验

num2label = {
    0: 'B',
    1: 'M',
    2: 'E',
    3: 'S'
}

# 对于单个句子的调用,返回每个字对应的标签,打印并返回分词后的结果
def word_seperate(model, words, show=True):
    input = inp_convert(words, words_map)
    input = input.unsqueeze(0).to(device)
    logits = model(input)[0]
    max_indices = torch.argmax(logits, dim=1)
    
    s = ""
    labels = []
    for i, num in enumerate(max_indices):
        s += words[i]
        if num >= 2 and i != len(words) - 1:
            s += " "
        labels.append(num2label[num.item()])
    if show:
        print(s)
    return labels, s

play_data = load_data('./datasets/test.dat')
import random
length = len(play_data)
id = random.randint(0, length - 1)
# 这里可以将words换成任意想分词的句子
words = play_data[id].strip()
labels, _ = word_seperate(model_, words)
print(labels)

下期预告:词性标注

另:如果你也有将ipynb转成md的需要的话,可以考虑

pip install nbconvert
jupyter nbconvert --to markdown 你的文件.ipynb
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐