【NLP入门】Lab1.1 - 使用LSTM进行中文分词(附完整代码)
使用LSTM进行中文分词的完整流程,从数据集准备,模型构建到训练与测试
1. 思路介绍
1.1 任务抽象
对中文分词任务进行抽象,可以变成一个标注任务,也就是对于每一个字,打上BMES等标签中的一个,之后通过标签对字进行切分。(B - Begin, M - Middle, E - end, S - Single[单字作词])
1.2 方法选择
Bi-LSTM是一个基于RNN的用于序列处理的神经网络,RNN的特点便是对于输入的长为seq_len的序列,会输出一个处理过后的相同长度的序列,特别适合用于标注任务。而Bi-LSTM又通过门控策略使得模型能更好的捕捉长上下文之间的关系,双向又让模型能够充分利用前后文本的信息(类比双向最大匹配分词)
所以,Bi-LSTM 就决定是你了!
1.3 实现方法
- 数据准备
- 模型构建
- 训练
- 测试及使用
import torch
import torch.nn as nn
from tqdm import tqdm
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
2. 数据准备
2.1 数据集介绍
Weibo分词数据集NLPCC2016分词赛道使用的数据集(把datasets以及里面的train,test,dev下载了就行),该数据集由复旦大学根据新浪微博的数据标注生成,包含更多口语化的文本,包含经济、运动、环境等多种主题的语料。
2.2 数据集加载
def load_data(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
data = file.readlines()
print(f"Data size: {len(data)}")
return data
2.3 数据预处理
目标:得到数据集data_loader,其中的数据对为(字符串的字符级编码,分类标签序列)
# 构建词典
def construct_words_map(line_list):
words_list = list("".join(line_list))
words_list = list(set(words_list))
num = len(words_list)
words_map = {}
for i in range(num):
words_map[words_list[i]] = i
words_map['None'] = num
return words_map, num + 1
# 字符级编码
def inp_convert(inp, words_map):
lis = []
length = len(inp)
for i in range(length):
lis.append(words_map[inp[i]])
return torch.tensor(lis)
# 分词数据集类,用于构造dataloader
class DivDataset(Dataset):
def __init__(self, line_list, words_map, fix_length=100):
self.inputs = []
self.outputs = []
for line in line_list:
line = line.strip()
# 去掉换行和空格
input = inp_convert(line.replace(" ", ""), words_map)
# 空格切分
words = line.split(" ")
output = []
for word in words:
if len(word) > 1:
output += [0] + [1]*(len(word)-2) + [2]
else:
output += [3]
output = torch.tensor(output)
# 裁剪填充对齐
if input.size(0) < fix_length:
input = F.pad(input, (0, fix_length - input.size(0)), value=words_map['None'])
output = F.pad(output, (0, fix_length - output.size(0)), value=-1)
else:
input = input[:fix_length]
output = output[:fix_length]
self.inputs.append(input)
self.outputs.append(output)
def __len__(self):
return len(self.inputs)
def __getitem__(self, idx):
input_data = self.inputs[idx]
output_data = self.outputs[idx]
return input_data, output_data
train_data = load_data('./datasets/train.dat')
test_data = load_data("./datasets/dev.dat")
words_map, vocab_size = construct_words_map(train_data + test_data)
train_dataset = DivDataset(train_data, words_map)
train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)
3. 模型构建
3.1 初始化模型结构
class BiLSTM(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_dim, num_labels):
super().__init__()
# 嵌入层
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
# 双向LSTM
self.lstm = nn.LSTM(embed_dim, hidden_dim//2,
num_layers=1, bidirectional=True, batch_first=True)
# 全连接层作多分类
self.fc = nn.Linear(hidden_dim, num_labels)
def forward(self, x):
# x: (batch_size, seq_len)
embeds = self.embedding(x) # (batch_size, seq_len, embed_dim)
lstm_out, _ = self.lstm(embeds) # lstm_out: (batch_size, seq_len, hidden_dim)
logits = self.fc(lstm_out) # (batch_size, seq_len, num_labels)
return logits
3.2 初始化模型并移动到对应设备
def get_device():
return torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
embed_dim = 128
hidden_dim = 64
num_label = 4
device = get_device()
model = BiLSTM(vocab_size, embed_dim, hidden_dim, num_label).to(device)
print(device)
4. 训练模型
# 训练函数
def train_model(model, train_loader, optimizer, criterion, device, num_epochs=10):
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
correct_preds = 0
total_preds = 0
for batch_data in tqdm(train_loader):
inp, ans = batch_data
inp = inp.to(device)
ans = ans.to(device)
optimizer.zero_grad() # 梯度清零(不然每次会累积)
logits = model(inp) # 前向传播
loss = criterion(
logits.view(-1, logits.size(-1)), # (batch_size*seq_len, num_labels)
ans.view(-1) # (batch_size*seq_len)
) # 调整维度计算损失
# 反向传播与优化
loss.backward()
optimizer.step()
running_loss += loss.item() # 统计指标
# 计算准确率
preds = torch.argmax(logits, dim=2) # (batch_size, seq_len)
mask = (ans != -1) # 有效位置掩码
correct_preds += torch.sum((preds == ans) & mask).item()
total_preds += torch.sum(mask).item()
# 计算epoch统计指标
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct_preds / total_preds if total_preds > 0 else 0
print(f'Epoch {epoch+1}/{num_epochs}')
print(f'Training Loss: {epoch_loss:.4f} | Accuracy: {epoch_acc:.4f}\n')
print("Training complete!")
return model
# 定义优化器,损失函数病进行训练(而你,我的朋友,是真正的调参高手)
optimizer = torch.optim.Adam(
model.parameters(),
lr=0.001, # 初始学习率
weight_decay=1e-4 # L2正则化(防止过拟合)
)
criterion = nn.CrossEntropyLoss(
ignore_index=-1 # 忽略填充部分
)
train_model(model, train_loader, optimizer, criterion, device)
5. 模型测试
def evaluate(model, val_loader):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for inp, ans in val_loader:
inp = inp.to(device)
ans = ans.to(device)
logits = model(inp)
preds = torch.argmax(logits, dim=2).cpu()
# 只统计有效长度部分
for i in range(len(inp)):
nonzero = torch.where(ans[i] != -1)[0]
valid_pred = preds[i][:nonzero[-1].item()].to("cpu")
valid_label = ans[i][:nonzero[-1].item()].to("cpu")
correct += (valid_pred == valid_label).sum().item()
total += nonzero[-1].item()
acc = correct / total
print(f'Validation Accuracy: {acc:.4f}')
return acc
test_dataset = DivDataset(test_data, words_map)
test_loader = DataLoader(test_dataset, batch_size=10, shuffle=True)
acc = evaluate(model, test_loader)
6. 模型的保存与加载
这里提供的方法是直接保存和加载整个模型,包括架构和权重。其实可以用torch.save(model.state_dict(), save_path)只保存权重,但是加载的时候需要先将模型初始化好
# 保存模型,路径后缀需要是.pth
def save_model(model, save_path):
torch.save(model, save_path)
# 加载模型
def load_model(load_path, device):
model = torch.load(load_path).to(device)
return model
save_model(model, "./model/seperate.pth")
model_ = load_model("./model/seperate.pth", device)
7. 模型使用体验
num2label = {
0: 'B',
1: 'M',
2: 'E',
3: 'S'
}
# 对于单个句子的调用,返回每个字对应的标签,打印并返回分词后的结果
def word_seperate(model, words, show=True):
input = inp_convert(words, words_map)
input = input.unsqueeze(0).to(device)
logits = model(input)[0]
max_indices = torch.argmax(logits, dim=1)
s = ""
labels = []
for i, num in enumerate(max_indices):
s += words[i]
if num >= 2 and i != len(words) - 1:
s += " "
labels.append(num2label[num.item()])
if show:
print(s)
return labels, s
play_data = load_data('./datasets/test.dat')
import random
length = len(play_data)
id = random.randint(0, length - 1)
# 这里可以将words换成任意想分词的句子
words = play_data[id].strip()
labels, _ = word_seperate(model_, words)
print(labels)
下期预告:词性标注
另:如果你也有将ipynb转成md的需要的话,可以考虑
pip install nbconvert
jupyter nbconvert --to markdown 你的文件.ipynb
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)