六、针对分类的微调LLM
本文介绍了如何对预训练的大语言模型进行微调以适应垃圾邮件分类任务。首先构建平衡数据集,使用GPT2 tokenizer构建数据加载器。模型微调时冻结大部分参数,仅微调最后一层transformer块、归一化层和修改后的输出层(2分类)。实验展示了模型在垃圾邮件分类上的应用效果,并提供了模型保存和加载方法。该方法有效利用预训练模型的特征提取能力,通过有限微调实现特定分类任务。
预训练大语言模型后,要使LLM更能应用的真正的各个生活领域中,还需要根据需求取微调模型。微调LLM的两个主要方式是用于分类的微调和用于执行指令的微调。
分类微调,即模型被训练出来识别一组特定的类别标签。如在消息中过滤垃圾信息和非垃圾信息,从图像中识别不同的植物种类,给新闻分类等。但是,经过分类微调的模型只能预测训练过程中遇到的类别。
而指令微调能够执行更广泛的任务,适合处理需要应对多种任务的模型,指令微调提升了模型基于特定用户指令理解和生成响应的能力。但是,它需要更大的数据集和更多的计算资源来开发精通多种任务的模型。
在此,我们主要使用之前构建并预训练的GPT进行修改和分类微调,让模型对包含垃圾信息和非垃圾信息的邮件文本进行筛选。
1、构建数据集
下载数据集储存在.tsv文件中,并且查看数据的标签分布
#下载数据集
import urllib.request
import zipfile
import os
from pathlib import Path
url = "https://archive.ics.uci.edu/static/public/228/sms+spam+collection.zip"
zip_path = "sms+spam+collection.zip"
extracted_path = "sms_spam_collection"
data_file_path = Path(extracted_path) / "SMSSpamCollection.tsv"
def download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path):
if data_file_path.exists():
print("{data_file_path} already exists. Skipping download.")
return
#下载文件
with urllib.request.urlopen(url) as response:
with open(zip_path, "wb") as out_file:
out_file.write(response.read())
#解压文件
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extracted_path)
original_file_path = Path(extracted_path) / "SMSSpamCollection"
#添加.tsv扩展名
os.rename(original_file_path, data_file_path)
print(f"File downloaded and saved as {data_file_path}")
download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path)
#加载文件并且查看类别分布
import pandas as pd
df = pd.read_csv(data_file_path, sep="\t", header=None, names=["Label", "Text"])
print(df["Label"].value_counts())
"""
ham 4825
spam 747
Name: Label, dtype: int64
"""
可以看到非垃圾信息(ham)数量远高于垃圾信息(spam),为了建立一个小数据集用于展示,并且平衡数据集类别分布,我们创建一个每个类别均包含747个实例的数据集。
#构建一个平衡的数据集
def crate_balanced_dataset(df):
#垃圾样本信息
num_spam = df[df["Label"] == "spam"].shape[0]
#随机采样非垃圾信息,使其数量与垃圾信息一致
ham_subset = df[df["Label"] == "ham"].sample(
num_spam, random_state=123
)
#组合垃圾信息和非垃圾信息,构建新的数据集
balanced_df = pd.concat([
ham_subset, df[df["Label"] == "spam"]
])
return balanced_df
balanced_df = crate_balanced_dataset(df)
print(balanced_df["Label"].value_counts())
"""
ham 747
spam 747
Name: Label, dtype: int64
"""
将标签修改为0和1,分别代表ham和spam,按照训练集:验证集:测试集=7:1:2比例划分数据集。
#构建一个平衡的数据集
def crate_balanced_dataset(df):
#垃圾样本信息
num_spam = df[df["Label"] == "spam"].shape[0]
#随机采样非垃圾信息,使其数量与垃圾信息一致
ham_subset = df[df["Label"] == "ham"].sample(
num_spam, random_state=123
)
#组合垃圾信息和非垃圾信息,构建新的数据集
balanced_df = pd.concat([
ham_subset, df[df["Label"] == "spam"]
])
return balanced_df
# 调整标签为0、1
balanced_df["Label"] = balanced_df["Label"].map({"ham": 0, "spam": 1})
#划分数据集
train_df, val_df, test_df = random_split(balanced_df, 0.7, 0.1)
#保存为csv文件
train_df.to_csv("train.csv", index=None)
val_df.to_csv("val.csv", index=None)
test_df.to_csv("test.csv", index=None)
接下来就是构建用于pytorch训练的数据加载器了。之前使用滑动窗口来生成统一大小的文本块,然后分组为批次。当处理包含长度不一的文本消息数据集时,可以截断到数据集中最短消息长度或批次长度,或者填充到数据集中最长消息的长度或批次长度。第一种虽然减少计算开销但是会降低模型性能,所以此处选择第二种填充的方案。
将"<|endoftext|>"的词元ID50256填充到编码的文本消息中。
class SpamDataset(Dataset):
def __init__(self, csv_file, tokenizer, max_length=None, pad_token_id=50256):
self.data = pd.read_csv(csv_file)
self.encoded_texts = [tokenizer.encode(text) for text in self.data["Text"]]
if max_length is None:
self.max_length = self._longest_encoded_length()
else:
self.max_length = max_length
#序列长度超过最大长度,截断
self.encoded_texts = [text[:self.max_length] for text in self.encoded_texts]
#填充到最长序列长度
self.encoded_texts = [text + [pad_token_id] * (self.max_length - len(text))
for text in self.encoded_texts]
def __getitem__(self, item):
encoded = self.encoded_texts[item]
label = self.data.iloc[item]["Label"]
return (torch.tensor(encoded, dtype=torch.long),
torch.tensor(label, dtype=torch.long))
def __len__(self):
return len(self.data)
def _longest_encoded_length(self):
max_length = 0
for text in self.encoded_texts:
text_length = len(text)
if text_length > max_length:
max_length = text_length
return max_length
构造数据加载器
tokenizer = tiktoken.get_encoding("gpt2")
#验证集测试集的最大长度不要超过训练集的
train_dataset = SpamDataset("train.csv", max_length=None, tokenizer=tokenizer)
val_dataset = SpamDataset("val.csv", max_length=train_dataset.max_length, tokenizer=tokenizer)
test_dataset = SpamDataset("test.csv", max_length=train_dataset.max_length, tokenizer=tokenizer)
num_workers = 0
batch_size = 8
torch.manual_seed(123)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=False)
2、初始化带有预训练权重的模型
参照上一讲的,给模型加载预训练权重。下载权重后,使用load_weight_into_gpt方法
加载到GPTModel实例化中。
#设置模型配置
CHOOSE_MODEL = "gpt2-small (124M)"
INPUT_PROMPT = "Every effort moves"
BASE_CONFIG = {
"vocab_size": 50257,
"context_length": 1024,
"drop_rate": 0.0,
"qkv_bias": True
}
model_configs = {
"gpt2-small (124M)": {"emb_dim": 768, "num_layers": 12, "num_heads": 12},
"gpt2-medium (355M)": {"emb_dim": 1024, "num_layers": 24, "num_heads": 16},
"gpt2-large (774M)": {"emb_dim": 1280, "num_layers": 36, "num_heads": 20},
"gpt2-xl (1558M)": {"emb_dim": 1600, "num_layers": 48, "num_heads": 25}
}
BASE_CONFIG.update(model_configs[CHOOSE_MODEL])
#参考第五讲的加载预训练方法
from gpt_download import download_and_load_gpt2
from PreTrain import GPTModel, load_weight_into_gpt
model_size = CHOOSE_MODEL.split(" ")[-1].lstrip("(").rstrip(")")
settings, params = download_and_load_gpt2(model_size=model_size, models_dir="gpt2")
model = GPTModel(BASE_CONFIG)
load_weight_into_gpt(model, params)
model.eval()
3、设置分类头和评估函数
之前构造的gpt模型是用于生成下一个词元的,不符合这里要求的分类任务,所以需要将原始的输出层(映射到包含50257个词汇的词汇表中)修改为一个较小的输出层(只需要包含0和1两个类别即可)。
此外,刚刚我们给模型导入了预训练权重,而微调模型时是不需要微调所有的层的。神经网络模型中,低层捕捉的是基本的语义信息,适用于广泛的任务和数据集,最后几层关注的更多的是细微的语言模式和特定的任务特征,所以我们只需要微调最后几层即可。我们选择微调最后一个trainsformer块、最终归一化层和输出层。
#微调模型的相关修改
def fine_tuning_config(model, num_class, BASE_CONFIG):
#冻结模型参数
for param in model.parameters():
param.requires_grad = False
#修改最后的输出层输出维度
model.out_head = torch.nn.Linear(in_features=BASE_CONFIG["emb_dim"], out_features=num_class)
#最后一个transformer块和最终归一化层设为可训练
for param in model.transformer_blocks[-1].parameters():
param.requires_grad = True
for param in model.final_norm.parameters():
param.requires_grad = True
用示例展示下效果,输入维度为(1,4),batct size是1,后面的4是因为输入了四个词元。输出的shape是(1,4,2)是因为最后的输出层输出维度是2。在将其转换成类别标签预测之前,我们只需要取输出向量的最后一行数据(outputs[:, -1, :])即可。因为在GPT模型中常会用到之前说过的因果注意力掩码,这使得当前词元只关注当前位置及之前位置的词元,而序列中最后一个词元无疑是关注的词元数量最多,其包含的信息也是最多的。因此,在分类任务中,微调过程中只需要关注最后一个即可。
num_class = 2
fine_tuning_config(model, num_class, BASE_CONFIG)
#演示效果
inputs = tokenizer.encode("Do you have time")
inputs = torch.tensor(inputs).unsqueeze(0) #添加batch维度
print("inputs shape:", inputs.shape) #torch.Size([1, 4])
with torch.no_grad():
outputs = model(inputs)
print("outputs shape:", outputs.shape) #torch.Size([1, 4, 2])
最后,引入计算分类精确度和损失方法。基本和上讲一致,只需要关注,输出的logits取向量的最后一行,然后使用argmax取概率最大的索引即可。
#计算分类准确率
def calc_accuracy_loader(data_loader, model, device, num_batches=None):
model.eval()
correct_preds, num_examples = 0, 0
if batch_size is None:
num_batches = len(data_loader)
else:
num_batches = min(num_batches, len(data_loader))
for i, (input_batch, target_batch) in enumerate(data_loader):
if i > num_batches:
break
input_batch = input_batch.to(device)
target_batch = target_batch.to(device)
with torch.no_grad():
logits = model(input_batch)[:, -1, :] #保存最后一个词元的logits
predict_labels = torch.argmax(logits, dim=-1) #argmax取最大概率为预测标签
num_examples += predict_labels.shape[0] #batch size
correct_preds += (predict_labels == target_batch).sum().item()
return correct_preds / num_examples
#计算损失
def calc_loss_batch(input_batch, target_batch, model, device):
input_batch = input_batch.to(device)
target_batch = target_batch.to(device)
logits = model(input_batch)[:, -1, :]
loss = torch.nn.functional.cross_entropy(logits, target_batch)
return loss
def calc_loss_loader(data_loader, model, device, num_batches=None):
total_loss = 0
if len(data_loader) == 0:
return float("nan")
elif num_batches is None:
num_batches = len(data_loader)
else:
num_batches = min(num_batches, len(data_loader))
for i, (input_batch, target_batch) in enumerate(data_loader):
if i > num_batches:
break
loss = calc_loss_batch(input_batch, target_batch, model, device)
total_loss += loss.item() #item()取出数值,不包含梯度信息
return total_loss / num_batches
4、微调模型
接下来就是训练模型了,基本上和之前类似,唯一的区别就是要计算分类准确率,而不是生成文本样本来评估模型。
def evaluate_model(model, train_loader, val_loader, device, eval_iter):
model.eval()
with torch.no_grad():
train_loss = calc_loss_loader(train_loader, model, device, num_batches=eval_iter)
val_loss = calc_loss_loader(val_loader, model, device, num_batches=eval_iter)
model.train()
return train_loss, val_loss
#训练样本
def train_classifier_simple(model, train_loader, val_loader, optimizer, device, epochs, eval_freq, eval_iter):
train_losses, val_losses, train_accs, val_accs = [], [], [], []
examples_seen, global_step = 0, -1
for epoch in range(epochs):
model.train()
for input_batch, target_batch in train_loader:
optimizer.zero_grad()
loss = calc_loss_batch(input_batch, target_batch, model, device)
loss.backward()
optimizer.step()
examples_seen += input_batch.shape[0] #统计样本数量
global_step += 1
#评估模型
if global_step % eval_freq == 0:
train_loss, val_loss = evaluate_model(model, train_loader, val_loader, device, eval_iter)
train_losses.append(train_loss)
val_losses.append(val_loss)
print(f"Ep {epoch + 1} (Step{global_step:06d}: Train loss: {train_loss:.3f},Val loss:{val_loss:.3f}")
#每轮训练后计算准确率
train_accuracy = calc_accuracy_loader(train_loader, model, device, num_batches=eval_iter)
val_accuracy = calc_accuracy_loader(val_loader, model, device, num_batches=eval_iter)
print(f"Training accuracy: {train_accuracy}, Validation accuracy: {val_accuracy}")
return train_losses, val_losses, train_accs, val_accs, examples_seen
#开始训练
def train():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(123)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.1)
epochs = 10
train_losses, val_losses, train_accs, val_accs, examples_seen = train_classifier_simple(
model, train_loader, val_loader, optimizer, device, epochs, eval_freq=50, eval_iter=5
)
输入一段文本,查看模型效果
#对新文本分类测试
def classify_review(text, model, tokenizer, device, max_length=None, pad_token_id=50256):
model.eval()
#准备数据
input_ids = tokenizer.encode(text)
suppoerted_context_length = model.pos_emb.weight.shape[1]
input_ids = input_ids[:min(max_length, suppoerted_context_length)] #截断过长文本
input_ids += [pad_token_id] * (max_length - len(input_ids)) #填充至最长序列
input_tensor= torch.tensor(input_ids, device=device).unsqueeze(0)
#推理
with torch.no_grad():
logits = model(input_tensor)[:, -1, :]
predict = torch.argmax(logits, dim=-1).item()
return "spam" if predict == 1 else "not spam"
text = "You are a winner you have been specially selected to receive $1000 cash"
print(classify_review(text, model, tokenizer, device, max_length=train_dataset.max_length))
#spam
保存模型权重和加载权重
#保存权重
torch.save(model.state_dict(), "review_classifier.pth")
# #加载权重
model_state_dict = torch.load("review_classifier.pth", map_location=device)
model.load_state_dict(model_state_dict)
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)